diff --git a/source/libs/executor/inc/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h similarity index 80% rename from source/libs/executor/inc/dataSinkMgt.h rename to include/libs/executor/dataSinkMgt.h index d13423b25d..a0819fcf85 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -21,13 +21,11 @@ extern "C" { #endif #include "os.h" -#include "executorimpl.h" +#include "thash.h" -#define DS_CAPACITY_ENOUGH 1 -#define DS_CAPACITY_FULL 2 -#define DS_NEED_SCHEDULE 3 -#define DS_END 4 -#define DS_IN_PROCESS 5 +#define DS_BUF_LOW 1 +#define DS_BUF_FULL 2 +#define DS_BUF_EMPTY 3 struct SDataSink; struct SSDataBlock; @@ -42,15 +40,20 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); typedef void* DataSinkHandle; typedef struct SInputData { - const SSDataBlock* pData; + const struct SSDataBlock* pData; SHashObj* pTableRetrieveTsMap; } SInputData; -typedef struct SOutPutData { +typedef struct SOutputData { int32_t numOfRows; int8_t compressed; char* pData; -} SOutPutData; + bool queryEnd; + bool needSchedule; + int32_t bufStatus; + int64_t useconds; + int8_t precision; +} SOutputData; /** * Create a subplan's datasinker handle for all later operations. @@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH * @param pRes * @return error code */ -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -void dsEndPut(DataSinkHandle handle); +void dsEndPut(DataSinkHandle handle, int64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle - * @return data length + * @param pLen data length */ -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); /** * Get data, the caller needs to allocate data memory. @@ -84,7 +87,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); * @param pStatus output * @return error code */ -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput); /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 1bbf5494dd..69727626af 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -31,10 +31,10 @@ typedef struct SDataSinkManager { pthread_mutex_t mutex; } SDataSinkManager; -typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); -typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); -typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef struct SDataSinkHandle { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d8e51d04d..e4b0557bff 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -19,6 +19,7 @@ #include "tcompression.h" #include "tglobal.h" #include "tqueue.h" +#include "executorimpl.h" #define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) @@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle { STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; + bool queryEnd; + int64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -124,7 +127,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; + int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); + int32_t status = (0 == blockNums ? DS_BUF_EMPTY : + (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -137,7 +142,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { return status; } -static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { @@ -145,38 +150,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pStatus = updateStatus(pDispatcher); + *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle) { +static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_END; + pDispatcher->queryEnd = true; + pDispatcher->useconds = useconds; pthread_mutex_unlock(&pDispatcher->mutex); } -static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { +static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; - return 0; + *pQueryEnd = pDispatcher->queryEnd; + *pLen = 0; + return; } SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); - return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } -static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->compressed = pEntry->compressed; tfree(pDispatcher->nextOutput.pData); // todo persistent - *pStatus = updateStatus(pDispatcher); + pOutput->bufStatus = updateStatus(pDispatcher); + pthread_mutex_lock(&pDispatcher->mutex); + pOutput->queryEnd = pDispatcher->queryEnd; + pOutput->needSchedule = false; + pOutput->useconds = pDispatcher->useconds; + pOutput->precision = pDispatcher->schema.precision; + pthread_mutex_unlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } @@ -205,7 +218,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; dispatcher->schema = pDataSink->schema; - dispatcher->status = DS_CAPACITY_ENOUGH; + dispatcher->status = DS_BUF_EMPTY; + dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 8a96c5d05f..e3f0cd7eaa 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH return TSDB_CODE_FAILED; } -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); + return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle) { +void dsEndPut(DataSinkHandle handle, int64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fEndPut(pHandleImpl); + return pHandleImpl->fEndPut(pHandleImpl, useconds); } -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) { +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetLen(pHandleImpl, pStatus); + pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); } -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); + return pHandleImpl->fGetData(pHandleImpl, pOutput); } void dsScheduleProcess(void* ahandle, void* pItem) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ab3aa02f4a..cac80c0a5f 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "cJSON.h" #include "os.h" #include "taoserror.h" @@ -33,6 +32,24 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } +void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { + char* limit; + + if (nlen == 0 || hlen < nlen) { + return false; + } + + limit = haystack + hlen - nlen + 1; + while ((haystack = (char*)memchr( + haystack, needle[0], limit - haystack)) != NULL) { + if (memcmp(haystack, needle, nlen) == 0) { + return haystack; + } + haystack++; + } + return NULL; +} + static inline int64_t walScanLogGetLastVer(SWal* pWal) { ASSERT(pWal->fileInfoSet != NULL); int sz = taosArrayGetSize(pWal->fileInfoSet); @@ -47,7 +64,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { struct stat statbuf; stat(fnameStr, &statbuf); - int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size); + int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size); + pLastFileInfo->fileSize = statbuf.st_size; FileFd fd = taosOpenFileRead(fnameStr); if (fd < 0) { @@ -64,6 +82,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } + taosLSeekFile(fd, -readSize, SEEK_END); if (readSize != taosReadFile(fd, buf, readSize)) { free(buf); taosCloseFile(fd); @@ -71,21 +90,25 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } - char* found = strstr(buf, (const char*)&magic); - if (found == NULL) { - ASSERT(false); - // file has to be deleted - free(buf); - taosCloseFile(fd); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; - } - char *another; - while((another = strstr(found + 1, (const char*)&magic)) != NULL) { + char* haystack = buf; + char* found = NULL; + char *candidate; + while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate - SWalHead *logContent = (SWalHead*)another; + SWalHead *logContent = (SWalHead*)candidate; if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { - found = another; + found = candidate; + } + haystack = candidate + 1; + } + if (found == buf) { + SWalHead *logContent = (SWalHead*)found; + if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { + // file has to be deleted + free(buf); + taosCloseFile(fd); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; } } taosCloseFile(fd); @@ -120,7 +143,6 @@ int walCheckAndRepairMeta(SWal* pWal) { SWalFileInfo fileInfo; memset(&fileInfo, -1, sizeof(SWalFileInfo)); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); - FileFd fd = taosOpenFileRead(ent->d_name); //get lastVer //get size taosArrayPush(pLogInfoArray, &fileInfo); @@ -137,28 +159,25 @@ int walCheckAndRepairMeta(SWal* pWal) { } int newSz = taosArrayGetSize(pLogInfoArray); // case 1. meta file not exist / cannot be parsed - if (pWal->fileInfoSet == NULL && newSz != 0) { - // recover fileInfo set - pWal->fileInfoSet = pLogInfoArray; - if (newSz != 0) { - // recover meta version - pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer; - pWal->writeCur = newSz - 1; - } - // recover file size - } else if (oldSz < newSz) { + if (oldSz < newSz) { for (int i = oldSz; i < newSz; i++) { SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); taosArrayPush(pWal->fileInfoSet, pFileInfo); } + pWal->writeCur = newSz - 1; + pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer; + pWal->vers.lastVer = walScanLogGetLastVer(pWal); + ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; + ASSERT(pWal->vers.lastVer != -1); + + int code = walSaveMeta(pWal); + if (code < 0) { + taosArrayDestroy(pLogInfoArray); + return -1; + } } - if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) { - pWal->vers.lastVer = walScanLogGetLastVer(pWal); - ASSERT(pWal->vers.lastVer != -1); - } - // case 2. versions in meta not match log // or some log not included in meta // (e.g. program killed) @@ -182,14 +201,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } #endif - int code = walSaveMeta(pWal); - if (code < 0) { - return -1; - } // get last version of this file // // rebuild meta + taosArrayDestroy(pLogInfoArray); return 0; } @@ -397,6 +413,10 @@ int walLoadMeta(SWal* pWal) { } memset(buf, 0, size + 5); FileFd fd = taosOpenFileRead(fnameStr); + if (fd < 0) { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } if (taosReadFile(fd, buf, size) != size) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(fd); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index d12acb52c6..d5c28d9d9b 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { // init write buffer memset(&pWal->writeHead, 0, sizeof(SWalHead)); pWal->writeHead.head.headVer = WAL_HEAD_VER; + pWal->writeHead.magic = WAL_MAGIC; if (pthread_mutex_init(&pWal->mutex, NULL) < 0) { taosArrayDestroy(pWal->fileInfoSet); @@ -121,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) { + walLoadMeta(pWal); + + if (walCheckAndRepairMeta(pWal) < 0) { taosRemoveRef(tsWal.refSetId, pWal->refId); pthread_mutex_destroy(&pWal->mutex); taosArrayDestroy(pWal->fileInfoSet); @@ -130,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } if (walCheckAndRepairIdx(pWal) < 0) { + } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index a95c75b11d..b65a200ca1 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -340,7 +340,10 @@ TEST_F(WalRetentionEnv, repairMeta1) { char buf[100]; sprintf(buf, "%s/meta-ver%d", pathName, 0); remove(buf); + sprintf(buf, "%s/meta-ver%d", pathName, 1); + remove(buf); SetUp(); + //getchar(); ASSERT_EQ(pWal->vers.lastVer, 99); @@ -377,4 +380,26 @@ TEST_F(WalRetentionEnv, repairMeta1) { ASSERT_EQ(code, 0); } + for (int i = 0; i < 1000; i++) { + int ver = rand() % 200; + code = walReadWithHandle(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.len, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + } diff --git a/tests/script/sim/db/basic1.sim b/tests/script/sim/db/basic1.sim index 52af7d93ea..33af1c5b59 100644 --- a/tests/script/sim/db/basic1.sim +++ b/tests/script/sim/db/basic1.sim @@ -85,7 +85,6 @@ if $data02 != 2 then return -1 endi -return system sh/exec.sh -n dnode1 -s stop -x SIGKILL system sh/exec.sh -n dnode1 -s start @@ -104,4 +103,4 @@ if $rows != 2 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT