diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 6a3439ada9..5cc580a9c6 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 125c77a + GIT_TAG 509ec72 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 80fa7a7218..849d83a58b 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,7 @@ typedef struct { TXN txn; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 01d42d6950..840e7309fe 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -619,6 +619,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) #define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) +#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159) +#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/client/src/TMQConnector.c b/source/client/src/TMQConnector.c index fcf6957df9..26bf55055f 100644 --- a/source/client/src/TMQConnector.c +++ b/source/client/src/TMQConnector.c @@ -212,7 +212,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN tmq_commit_async(tmq, res, commit_cb, consumer); } -JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; if (tmq == NULL) { jniError("jobj:%p, tmq is closed", jobj); @@ -222,7 +222,7 @@ JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp( return tmq_unsubscribe((tmq_t *)tmq); } -JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp(JNIEnv *env, jobject jobj, +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; if (tmq == NULL) { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 7ce80553a0..75ccd44977 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -878,12 +878,18 @@ int hbMgrInit() { clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); TdThreadMutexAttr attr = {0}; - taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + int ret = taosThreadMutexAttrInit(&attr); assert(ret == 0); - taosThreadMutexInit(&clientHbMgr.lock, &attr); - taosThreadMutexAttrDestroy(&attr); + ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + assert(ret == 0); + + ret = taosThreadMutexInit(&clientHbMgr.lock, &attr); + assert(ret == 0); + + ret = taosThreadMutexAttrDestroy(&attr); + assert(ret == 0); // init handle funcs hbMgrInitHandle(); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 16b8e55cf7..8a8e6a83a5 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1446,6 +1446,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(numOfCols); int32_t rowSize = pBlock->info.rowSize; int32_t nRows = payloadSize / rowSize; + ASSERT(nRows >= 1); // the true value must be less than the value of nRows int32_t additional = 0; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ddda8f8c9a..64d376d265 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 4; +int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; @@ -365,8 +365,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = tsNumOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); + tsNumOfVnodeFetchThreads = 1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; @@ -385,9 +384,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; -// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; -// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); -// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; + // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); @@ -527,15 +526,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } -/* - pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfQnodeFetchThreads = numOfCores / 2; - tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); - pItem->i32 = tsNumOfQnodeFetchThreads; - pItem->stype = stype; - } -*/ + /* + pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfQnodeFetchThreads = numOfCores / 2; + tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + pItem->i32 = tsNumOfQnodeFetchThreads; + pItem->stype = stype; + } + */ pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -693,7 +692,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; @@ -941,10 +940,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -/* - } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { - tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; -*/ + /* + } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { + tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + */ } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 9931462e5f..dade85b12d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -146,6 +146,7 @@ struct SRSmaInfoItem { uint16_t nScanned; int32_t maxDelay; // ms tmr_h tmrId; + void *pStreamState; }; struct SRSmaInfo { @@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int32_t ref = T_REF_INC(pRSmaInfo); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ec8fcb2932..412def646f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { + tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); +} + +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { + tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); +} + static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } @@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { taosTmrStopA(&pItem->tmrId); } + if (isDeepFree && pItem->pStreamState) { + streamStateClose(pItem->pStreamState); + } + if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { @@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SRetention *pRetention = SMA_RETENTION(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); SVnode *pVnode = pSma->pVnode; + char taskInfDir[TSDB_FILENAME_LEN] = {0}; + void *pStreamState = NULL; + + // set the backend of stream state + tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); + if (!taosCheckExistFile(taskInfDir)) { + char *s = strdup(taskInfDir); + if (taosMulMkDir(taosDirName(s)) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(s); + return TSDB_CODE_FAILED; + } + taosMemoryFree(s); + } + pStreamState = streamStateOpen(taskInfDir, NULL, true); + if (!pStreamState) { + terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; + return TSDB_CODE_FAILED; + } + + SReadHandle handle = { .meta = pVnode->pMeta, .vnode = pVnode, .initTqReader = 1, + .pStateBackend = pStreamState, }; - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; @@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot + pItem->pStreamState = pStreamState; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->fetchLevel = pItem->level; taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, @@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; } - if (nTables <= 0) { smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type); return TSDB_CODE_SUCCESS; } +#if 0 // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) { goto _err; } +#endif // step 3: reload ts data from checkpoint if (tdRSmaRestoreTSDataReload(pSma) < 0) { @@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte return TSDB_CODE_SUCCESS; } +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + int32_t vid = SMA_VID(pSma); + + if (taosHashGetSize(pInfoHash) <= 0) { + return TSDB_CODE_SUCCESS; + } + + int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); + if (pRSmaStat->commitAppliedVer <= fsMaxVer) { + smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, + pRSmaStat->commitAppliedVer, fsMaxVer); + return TSDB_CODE_SUCCESS; + } + + void *infoHash = NULL; + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); + if (pItem && pItem->pStreamState) { + if (streamStateCommit(pItem->pStreamState) < 0) { + terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; + goto _err; + } + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid, + i + 1); + } + } + } + + return TSDB_CODE_SUCCESS; +_err: + smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); + return TSDB_CODE_FAILED; +} + +#if 0 int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; @@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); if (pRSmaStat->commitAppliedVer <= fsMaxVer) { smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, - pRSmaStat->commitAppliedVer, fsMaxVer); + pRSmaStat->commitAppliedVer, fsMaxVer); return TSDB_CODE_SUCCESS; } @@ -1579,6 +1661,8 @@ _err: return TSDB_CODE_FAILED; } +#endif + /** * @brief trigger to get rsma result in async mode * @@ -1926,7 +2010,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, - atomic_load_64(&pRSmaStat->nBufItems)); + atomic_load_64(&pRSmaStat->nBufItems)); break; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8841e5e16..cbe916d0f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - if (pRsp->withSchema) { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); - } else { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); - } + ASSERT(!pRsp->withSchema); + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { @@ -760,7 +757,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } @@ -774,7 +771,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 45fe29f0fa..5f03a82bc0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -320,6 +320,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->pSttBlk = NULL; if (index != -1) { + pIter->iSttBlk = index; pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d75f0580e1..b88589740d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -989,7 +989,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod if (pNode->output) { (*numOfOutputCols) += 1; - } else { + } else if (info != NULL) { + // select distinct tbname from stb where tbname='abc'; info->output = false; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9b502eded7..a23f58a732 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5297,12 +5297,12 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { } static void doModeAdd(SModeInfo* pInfo, char* data) { - int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { int32_t size = sizeof(SModeItem) + pInfo->colBytes; SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size); - memcpy(pItem->data, data, pInfo->colBytes); + memcpy(pItem->data, data, hashKeyBytes); pItem->count += 1; taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index ade5be4722..2c47ddea8b 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -17,6 +17,18 @@ #include "plannodes.h" #include "tdatablock.h" +#ifndef htonll + +#define htonll(x) \ + (((int64_t)x & 0x00000000000000ff) << 7 * 8) | (((int64_t)x & 0x000000000000ff00) << 5 * 8) | \ + (((int64_t)x & 0x0000000000ff0000) << 3 * 8) | (((int64_t)x & 0x00000000ff000000) << 1 * 8) | \ + (((int64_t)x & 0x000000ff00000000) >> 1 * 8) | (((int64_t)x & 0x0000ff0000000000) >> 3 * 8) | \ + (((int64_t)x & 0x00ff000000000000) >> 5 * 8) | (((int64_t)x & 0xff00000000000000) >> 7 * 8) + +#define ntohll(x) htonll(x) + +#endif + #define NODES_MSG_DEFAULT_LEN 1024 #define TLV_TYPE_ARRAY_ELEM 0 @@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV pEncoder->allocSize = pEncoder->allocSize * 2; } STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); - pTlv->type = type; - pTlv->len = len; + pTlv->type = htons(type); + pTlv->len = htonl(len); memcpy(pTlv->value, pValue, len); pEncoder->offset += tlvLen; ++(pEncoder->tlvCount); @@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) { } static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { + value = htons(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) { + value = htons(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { + value = htonl(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) { + value = htonl(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { + value = htonll(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) { + value = htonll(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } @@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) { } static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { + value = htons(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) { + value = htons(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { + value = htonll(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) { + value = htonll(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { - return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); + int64_t temp = *(int64_t*)&value; + temp = htonll(temp); + return tlvEncodeImpl(pEncoder, type, &temp, sizeof(temp)); } static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) { - return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); + int64_t temp = *(int64_t*)&value; + temp = htonll(temp); + return tlvEncodeValueImpl(pEncoder, &temp, sizeof(temp)); } static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { + value = htonl(value); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); } static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) { + value = htonl(value); return tlvEncodeValueImpl(pEncoder, &value, sizeof(value)); } @@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) { int16_t len = strlen(pValue); - int32_t code = tlvEncodeValueImpl(pEncoder, &len, sizeof(len)); + int32_t code = tlvEncodeValueI16(pEncoder, len); if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueImpl(pEncoder, pValue, len); } @@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co int32_t code = func(pObj, pEncoder); if (TSDB_CODE_SUCCESS == code) { STlv* pTlv = (STlv*)(pEncoder->pBuf + start); - pTlv->type = type; - pTlv->len = pEncoder->offset - start - sizeof(STlv); + pTlv->type = htons(type); + pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv)); } ++(pEncoder->tlvCount); return code; @@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun } if (TSDB_CODE_SUCCESS == code) { STlv* pTlv = (STlv*)(pEncoder->pBuf + start); - pTlv->type = type; - pTlv->len = pEncoder->offset - start - sizeof(STlv); + pTlv->type = htons(type); + pTlv->len = htonl(pEncoder->offset - start - sizeof(STlv)); } } return code; @@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { } *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); + (*pTlv)->type = ntohs((*pTlv)->type); + (*pTlv)->len = ntohl((*pTlv)->len); if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { return TSDB_CODE_FAILED; } @@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) { return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); } -static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { + int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohs(*pValue); + } + return code; +} static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohs(*pValue); + } + return code; } -static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { + int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohl(*pValue); + } + return code; +} static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohl(*pValue); + } + return code; } -static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { + int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohll(*pValue); + } + return code; +} static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohll(*pValue); + } + return code; } static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } @@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) { return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); } -static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { + int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohs(*pValue); + } + return code; +} static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohs(*pValue); + } + return code; } -static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { + int32_t code = tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohll(*pValue); + } + return code; +} static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int32_t code = tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + if (TSDB_CODE_SUCCESS == code) { + *pValue = ntohll(*pValue); + } + return code; } -static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { + int64_t temp = 0; + int32_t code = tlvDecodeI64(pTlv, &temp); + if (TSDB_CODE_SUCCESS == code) { + *pValue = *(double*)&temp; + } + return code; +} static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) { - return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue)); + int64_t temp = 0; + int32_t code = tlvDecodeValueI64(pDecoder, &temp); + if (TSDB_CODE_SUCCESS == code) { + *pValue = *(double*)&temp; + } + return code; } static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) { @@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { - PHY_INTERVAL_CODE_WINDOW = 1, - PHY_INTERVAL_CODE_INTERVAL, - PHY_INTERVAL_CODE_OFFSET, - PHY_INTERVAL_CODE_SLIDING, - PHY_INTERVAL_CODE_INTERVAL_UNIT, - PHY_INTERVAL_CODE_SLIDING_UNIT -}; +enum { PHY_INTERVAL_CODE_WINDOW = 1, PHY_INTERVAL_CODE_INLINE_ATTRS }; + +static int32_t physiIntervalNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; + + int32_t code = tlvEncodeValueI64(pEncoder, pNode->interval); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueI64(pEncoder, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueI64(pEncoder, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueI8(pEncoder, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueI8(pEncoder, pNode->slidingUnit); + } + + return code; +} static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_INTERVAL, pNode->interval); + code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_INLINE_ATTRS, physiIntervalNodeInlineToMsg, pNode); + } + + return code; +} + +static int32_t msgToPhysiIntervalNodeInline(STlvDecoder* pDecoder, void* pObj) { + SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj; + + int32_t code = tlvDecodeValueI64(pDecoder, &pNode->interval); + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueI64(pDecoder, &pNode->offset); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_OFFSET, pNode->offset); + code = tlvDecodeValueI64(pDecoder, &pNode->sliding); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_SLIDING, pNode->sliding); + code = tlvDecodeValueI8(pDecoder, &pNode->intervalUnit); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_INTERVAL_UNIT, pNode->intervalUnit); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_SLIDING_UNIT, pNode->slidingUnit); + code = tlvDecodeValueI8(pDecoder, &pNode->slidingUnit); } return code; @@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) { case PHY_INTERVAL_CODE_WINDOW: code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); break; - case PHY_INTERVAL_CODE_INTERVAL: - code = tlvDecodeI64(pTlv, &pNode->interval); - break; - case PHY_INTERVAL_CODE_OFFSET: - code = tlvDecodeI64(pTlv, &pNode->offset); - break; - case PHY_INTERVAL_CODE_SLIDING: - code = tlvDecodeI64(pTlv, &pNode->sliding); - break; - case PHY_INTERVAL_CODE_INTERVAL_UNIT: - code = tlvDecodeI8(pTlv, &pNode->intervalUnit); - break; - case PHY_INTERVAL_CODE_SLIDING_UNIT: - code = tlvDecodeI8(pTlv, &pNode->slidingUnit); + case PHY_INTERVAL_CODE_INLINE_ATTRS: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiIntervalNodeInline, pNode); break; default: break; diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index bf19c7a222..2b8e3d9864 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -473,10 +473,11 @@ class PlannerTestBaseImpl { cout << "nodesNodeToMsg: " << chrono::duration_cast(chrono::steady_clock::now() - start).count() << "us" << endl; + string copyStr(pStr, len); SNode* pNode = NULL; char* pNewStr = NULL; int32_t newlen = 0; - DO_WITH_THROW(nodesMsgToNode, pStr, len, &pNode) + DO_WITH_THROW(nodesMsgToNode, copyStr.c_str(), len, &pNode) DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5234f09175..fde8bca77b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -18,14 +18,19 @@ #include "tcommon.h" #include "ttimer.h" -SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + char statePath[300]; - sprintf(statePath, "%s/%d", path, pTask->taskId); + if (!specPath) { + sprintf(statePath, "%s/%d", path, pTask->taskId); + } else { + memcpy(statePath, path, 300); + } if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { goto _err; } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index c6ecd37680..c5204ef59e 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx // copy content to the parent page tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0); tdbPageCopy(pNews[0], pParent, 1); + + if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) { + ((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno; + } } for (int i = 0; i < 3; i++) { diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 2cc62d3d6a..543ffc55b6 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; - // tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; - // tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 93ced912f8..5284aeff77 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) { char* walMetaSerialize(SWal* pWal) { char buf[30]; ASSERT(pWal->fileInfoSet); - int sz = pWal->fileInfoSet->size; + int sz = taosArrayGetSize(pWal->fileInfoSet); cJSON* pRoot = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject(); cJSON* pFiles = cJSON_CreateArray(); @@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) { int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); if (code == 0) { sscanf(name, "meta-ver%d", &metaVer); + wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer); break; } + wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name); } taosCloseDir(&pDir); regfree(&walMetaRegexPattern); @@ -422,6 +424,7 @@ int walLoadMeta(SWal* pWal) { // find existing meta file int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) { + wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer); return -1; } char fnameStr[WAL_FILE_LEN]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 50c42ff170..1906a77127 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")