From b85dba328c5f823921321997a110b36939ffd52c Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 15 Aug 2024 03:55:20 +0000 Subject: [PATCH 01/24] fix/TD-30849 --- include/libs/wal/wal.h | 6 +++++- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 ++--- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 1 + source/dnode/mgmt/node_util/inc/dmUtil.h | 2 ++ source/dnode/mnode/impl/src/mndDump.c | 2 +- source/dnode/vnode/inc/vnode.h | 4 +++- source/dnode/vnode/src/vnd/vnodeModule.c | 4 ++-- source/libs/wal/src/walMgmt.c | 10 +++++++++- source/libs/wal/src/walWrite.c | 15 +++++++++++++++ 10 files changed, 41 insertions(+), 10 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 66ea5ea5c7..f74e26eeda 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -95,6 +95,7 @@ typedef struct { } SWalCkHead; #pragma pack(pop) +typedef void (*stopDnodeFn)(); typedef struct SWal { // cfg SWalCfg cfg; @@ -119,6 +120,9 @@ typedef struct SWal { char path[WAL_PATH_LEN]; // reusable write head SWalCkHead writeHead; + + stopDnodeFn stopDnode; + } SWal; typedef struct { @@ -152,7 +156,7 @@ typedef struct SWalReader { } SWalReader; // module initialization -int32_t walInit(); +int32_t walInit(stopDnodeFn stopDnode); void walCleanUp(); // handle open and ctl diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 20802e33d9..48606b2ed9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -85,7 +85,7 @@ static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode) } static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { int32_t code = 0; - if ((code = walInit()) != 0) { + if ((code = walInit(pInput->stopDnodeFp)) != 0) { dError("failed to init wal since %s", tstrerror(code)); return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6bc0b5fe93..e599676cec 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -624,8 +624,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { goto _OVER; } tmsgReportStartup("vnode-tfs", "initialized"); - - if ((code = walInit()) != 0) { + if ((code = walInit(pInput->stopDnodeFp)) != 0) { dError("failed to init wal since %s", tstrerror(code)); goto _OVER; } @@ -638,7 +637,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("vnode-sync", "initialized"); - if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) { + if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) { dError("failed to init vnode since %s", tstrerror(code)); goto _OVER; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 9819c4f64e..0a75847d96 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -414,6 +414,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, .getQnodeLoadsFp = dmGetQnodeLoads, + .stopDnodeFp = dmStop, }; opt.msgCb = dmGetMsgcb(pWrapper->pDnode); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 3b94f00bee..5be41f830d 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -121,6 +121,7 @@ typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); +typedef void (*StopDnodeFp)(); typedef struct { int32_t dnodeId; @@ -159,6 +160,7 @@ typedef struct { GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp; + StopDnodeFp stopDnodeFp; } SMgmtInputOpt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c index 31e092f1a4..565e244014 100644 --- a/source/dnode/mnode/impl/src/mndDump.c +++ b/source/dnode/mnode/impl/src/mndDump.c @@ -581,7 +581,7 @@ void mndDumpSdb() { msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefault(&msgCb); - (void)walInit(); + (void)walInit(NULL); (void)syncInit(); SMnodeOpt opt = {.msgCb = msgCb}; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d01db56013..2f56aac7d6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -49,7 +49,9 @@ typedef struct SVSnapWriter SVSnapWriter; extern const SVnodeCfg vnodeCfgDefault; -int32_t vnodeInit(int32_t nthreads); +typedef void (*StopDnodeFp)(); + +int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs); bool vnodeShouldRemoveWal(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 8b7de7058c..709bfa19bc 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -18,13 +18,13 @@ static volatile int32_t VINIT = 0; -int vnodeInit(int nthreads) { +int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) { if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) { return 0; } TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads)); - TAOS_CHECK_RETURN(walInit()); + TAOS_CHECK_RETURN(walInit(stopDnodeFp)); return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 9da3207471..581a63671c 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -26,6 +26,7 @@ typedef struct { uint32_t seq; int32_t refSetId; TdThread thread; + stopDnodeFn stopDnode; } SWalMgmt; static SWalMgmt tsWal = {0, .seq = 1}; @@ -35,7 +36,7 @@ static void walFreeObj(void *pWal); int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); } -int32_t walInit() { +int32_t walInit(stopDnodeFn stopDnode) { int8_t old; while (1) { old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2); @@ -57,6 +58,11 @@ int32_t walInit() { atomic_store_8(&tsWal.inited, 1); } + if (stopDnode == NULL) { + wWarn("failed to set stop dnode call back"); + } + tsWal.stopDnode = stopDnode; + return 0; } @@ -164,6 +170,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { goto _err; } + pWal->stopDnode = tsWal.stopDnode; + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); return pWal; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index dc3b2df52c..9979ddd0b0 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -525,6 +525,11 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { if (size != sizeof(SWalIdxEntry)) { wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver); + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } @@ -571,6 +576,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_CHECK_GOTO(code, &lino, _exit); } @@ -627,6 +637,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy taosMemoryFreeClear(newBodyEncrypted); } + if (pWal->stopDnode != NULL) { + wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId); + pWal->stopDnode(); + } + TAOS_CHECK_GOTO(code, &lino, _exit); } From ce7d70c3b3de7f832cbd875a333e2f73f31a63d6 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Thu, 15 Aug 2024 14:02:31 +0800 Subject: [PATCH 02/24] test: (fix case) add timeout-return --- tests/pytest/util/common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index dee3f505c9..1141ca403d 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -1909,6 +1909,8 @@ class TDCom: if latency < self.stream_timeout: latency += 1 time.sleep(1) + else: + return False return tbname def get_group_id_from_stb(self, stbname): From 04764316b150a6a4e7a14517529a4ffe9fe53344 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 15 Aug 2024 06:07:15 +0000 Subject: [PATCH 03/24] fix/TD-30849-fix-unit-test-compile --- include/libs/wal/wal.h | 4 ++-- source/libs/wal/test/walMetaTest.cpp | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index f74e26eeda..a5d5316d23 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -118,11 +118,11 @@ typedef struct SWal { SHashObj *pRefHash; // refId -> SWalRef // path char path[WAL_PATH_LEN]; - // reusable write head - SWalCkHead writeHead; stopDnodeFn stopDnode; + // reusable write head + SWalCkHead writeHead; } SWal; typedef struct { diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index c7e83e7c86..8bd4de0a89 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -12,7 +12,7 @@ SWalSyncInfo syncMeta = {0}; class WalCleanEnv : public ::testing::Test { protected: static void SetUpTestCase() { - int code = walInit(); + int code = walInit(NULL); ASSERT(code == 0); } @@ -44,7 +44,7 @@ class WalCleanEnv : public ::testing::Test { class WalCleanDeleteEnv : public ::testing::Test { protected: static void SetUpTestCase() { - int code = walInit(); + int code = walInit(NULL); ASSERT(code == 0); } @@ -74,7 +74,7 @@ class WalCleanDeleteEnv : public ::testing::Test { class WalKeepEnv : public ::testing::Test { protected: static void SetUpTestCase() { - int code = walInit(); + int code = walInit(NULL); ASSERT(code == 0); } @@ -111,7 +111,7 @@ class WalKeepEnv : public ::testing::Test { class WalRetentionEnv : public ::testing::Test { protected: static void SetUpTestCase() { - int code = walInit(); + int code = walInit(NULL); ASSERT(code == 0); } From 9059454c905b6125525fc091ac469fe9652132c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 13:11:10 +0800 Subject: [PATCH 04/24] fix(tsdb): check return value. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 69 ++++++++++++--------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index e55ede560e..77a82e0dd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -146,10 +146,14 @@ static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) { pLoadInfo->currentLoadBlockIndex = nextSlotIndex; } -static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { - int32_t code = 0; +static int32_t loadLastBlock(SLDataIter *pIter, const char *idStr, SBlockData **pResBlock) { + if (pResBlock != NULL) { + *pResBlock = NULL; + } + int32_t code = 0; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; + if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { if (pInfo->currentLoadBlockIndex != 0) { tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64 @@ -157,7 +161,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pIter->iSttBlk, pIter->cid, pIter->uid, idStr); pInfo->currentLoadBlockIndex = 0; } - return &pInfo->blockData[0].data; + + *pResBlock = &pInfo->blockData[0].data; + return code; } if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { @@ -167,11 +173,13 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pIter->iSttBlk, pIter->cid, pIter->uid, idStr); pInfo->currentLoadBlockIndex = 1; } - return &pInfo->blockData[1].data; + + *pResBlock = &pInfo->blockData[1].data; + return code; } if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) { - return NULL; + return code; } updateBlockLoadSlot(pInfo); @@ -181,7 +189,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1], pInfo->numOfCols - 1); if (code != TSDB_CODE_SUCCESS) { - goto _exit; + return code; } double el = (taosGetTimestampUs() - st) / 1000.0; @@ -200,14 +208,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex, pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr); - return &pInfo->blockData[pInfo->currentLoadBlockIndex].data; -_exit: - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - } - - return NULL; + *pResBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data; + return code; } // find the earliest block that contains the required records @@ -735,12 +738,17 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) { } } -static void findNextValidRow(SLDataIter *pIter, const char *idStr) { - bool hasVal = false; - int32_t step = pIter->backward ? -1 : 1; - int32_t i = pIter->iRow; +static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) { + bool hasVal = false; + int32_t step = pIter->backward ? -1 : 1; + int32_t i = pIter->iRow; + SBlockData *pData = NULL; - SBlockData *pData = loadLastBlock(pIter, idStr); + int32_t code = loadLastBlock(pIter, idStr, &pData); + if (code) { + tsdbError("failed to load stt block, code:%s, %s", tstrerror(code), idStr); + return code; + } // mostly we only need to find the start position for a given table if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) { @@ -748,7 +756,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { if (i == -1) { tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr); pIter->iRow = -1; - return; + return code; } } @@ -817,20 +825,22 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) { } pIter->iRow = (hasVal) ? i : -1; + return code; } bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { - int32_t step = pIter->backward ? -1 : 1; - terrno = TSDB_CODE_SUCCESS; + int32_t step = pIter->backward ? -1 : 1; + int32_t code = 0; + int32_t iBlockL = pIter->iSttBlk; + SBlockData *pBlockData = NULL; // no qualified last file block in current file, no need to fetch row if (pIter->pSttBlk == NULL) { return false; } - int32_t iBlockL = pIter->iSttBlk; - SBlockData *pBlockData = loadLastBlock(pIter, idStr); - if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) { + code = loadLastBlock(pIter, idStr, &pBlockData); + if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -838,7 +848,10 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { while (1) { bool skipBlock = false; - findNextValidRow(pIter, idStr); + code = findNextValidRow(pIter, idStr); + if (code) { + goto _exit; + } if (pIter->pBlockLoadInfo->checkRemainingRow) { skipBlock = true; @@ -873,8 +886,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { } if (iBlockL != pIter->iSttBlk) { - pBlockData = loadLastBlock(pIter, idStr); - if (pBlockData == NULL) { + code = loadLastBlock(pIter, idStr, &pBlockData); + if ((pBlockData == NULL) || (code != 0)) { goto _exit; } @@ -888,7 +901,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: - return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); + return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); } // SMergeTree ================================================= From 5361d5f38a0ff405b1511dfd1fd2fdaf0bf71d1f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Aug 2024 13:27:51 +0800 Subject: [PATCH 05/24] fix: possible delete data loss when stt_trigger = 1 --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 48 ++++++++++------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 4467102d6f..3c407b31cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -157,41 +157,35 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { int64_t numRecord = 0; SMetaInfo info; - if (committer->tsdb->imem->nDel == 0) { - goto _exit; - } + // if no history data and no new timestamp data, skip tomb data + if (committer->ctx->info->fset || committer->ctx->hasTSData) { + committer->ctx->tbid->suid = 0; + committer->ctx->tbid->uid = 0; + for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { + if (record->uid != committer->ctx->tbid->uid) { + committer->ctx->tbid->suid = record->suid; + committer->ctx->tbid->uid = record->uid; - // do not need to write tomb data if there is no ts data - bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData); - - committer->ctx->tbid->suid = 0; - committer->ctx->tbid->uid = 0; - for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { - if (record->uid != committer->ctx->tbid->uid) { - committer->ctx->tbid->suid = record->suid; - committer->ctx->tbid->uid = record->uid; - - if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { - TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit); - continue; + if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) { + TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit); + continue; + } } - } - if (record->ekey < committer->ctx->minKey) { - // do nothing - } else if (record->skey > committer->ctx->maxKey) { - // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); - } else { - record->skey = TMAX(record->skey, committer->ctx->minKey); - record->ekey = TMIN(record->ekey, committer->ctx->maxKey); + if (record->ekey < committer->ctx->minKey) { + // do nothing + } else if (record->skey > committer->ctx->maxKey) { + // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); + } else { + record->skey = TMAX(record->skey, committer->ctx->minKey); + record->ekey = TMIN(record->ekey, committer->ctx->maxKey); - if (!skip) { numRecord++; TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(committer->writer, record), &lino, _exit); } - } - TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit); + TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit); + } } _exit: From a54e7bec9976628a5f5b772a45ead96b84729879 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Fri, 16 Aug 2024 15:34:34 +0800 Subject: [PATCH 06/24] fix TD-31500 --- packaging/tools/remove.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 093c81eef4..7af64fab1e 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -209,11 +209,11 @@ function clean_service_on_launchctl() { } function remove_data_and_config() { - data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + data_dir=`grep dataDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$data_dir" == X"" ]; then data_dir="/var/lib/${PREFIX}" fi - log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + log_dir=`grep logDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` if [ X"$log_dir" == X"" ]; then log_dir="/var/log/${PREFIX}" fi From 0cbbdf1b888a359e842937dba7022592c55cbbf4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 15:42:53 +0800 Subject: [PATCH 07/24] fix(query): if the return value of addTagPseudoColumnData is not success, not jump out. --- source/libs/executor/inc/executorInt.h | 2 - source/libs/executor/src/scanoperator.c | 54 +++++++++++++++---------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index d295e868e9..48adb22927 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -856,8 +856,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de extern void doDestroyExchangeOperatorInfo(void* param); -int32_t doFilterImpl(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, - SColumnInfoData** pResCol); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1935f2b0b6..d72716b141 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -279,21 +279,20 @@ static int32_t doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBloc return code; } -static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, - int32_t rows) { - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - SExprSupp* pSup = &pTableScanInfo->pseudoSup; - - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, - pTaskInfo, &pTableScanInfo->metaCache); +static int32_t doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, + int32_t rows) { + int32_t code = 0; + SExprSupp* pSup = &pTableScanInfo->pseudoSup; + if (pSup->numOfExprs > 0) { + code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, + pTaskInfo, &pTableScanInfo->metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { - T_LONG_JMP(pTaskInfo->env, code); + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + code = 0; } - - // reset the error code. - terrno = 0; } + + return code; } bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { @@ -373,10 +372,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); + code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); pCost->skipBlocks += 1; pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); - return TSDB_CODE_SUCCESS; + QUERY_CHECK_CODE(code, lino, _end); } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; loadSMA = true; // mark the operation of load sma; @@ -391,9 +390,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); + code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); - return TSDB_CODE_SUCCESS; + QUERY_CHECK_CODE(code, lino, _end); } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); *status = FUNC_DATA_REQUIRED_DATA_LOAD; @@ -473,7 +472,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca qError("[loadDataBlock] p != pBlock"); return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); + + code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); + if (code) { + return code; + } // restore the previous value pCost->totalRows -= pBlock->info.rows; @@ -912,7 +915,8 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas } // set tag/tbname - doSetTagColumnData(pBase, pBlock, pTaskInfo, 1); + terrno = doSetTagColumnData(pBase, pBlock, pTaskInfo, 1); + return pBlock; } @@ -1633,7 +1637,9 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL); QUERY_CHECK_CODE(code, lino, _end); - doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); + code = doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); + QUERY_CHECK_CODE(code, lino, _end); + pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } @@ -2762,12 +2768,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { - int32_t tmpCode = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, + code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. - if (tmpCode != TSDB_CODE_SUCCESS && tmpCode != TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + code = 0; + } + + if (code) { blockDataFreeRes((SSDataBlock*)pBlock); - T_LONG_JMP(pTaskInfo->env, code); + QUERY_CHECK_CODE(code, lino, _end); } // reset the error code. From 3c3507f283f678bae2be88645624e141494863f1 Mon Sep 17 00:00:00 2001 From: sima Date: Fri, 16 Aug 2024 15:54:07 +0800 Subject: [PATCH 08/24] fix:[TD-31503] Return null when expr in timediff is null, and use ms as default time_unit when time_unit is null. --- source/libs/function/src/builtins.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 760a3c4a33..a93ae8e574 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2573,13 +2573,14 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le for (int32_t i = 0; i < 2; ++i) { uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i))->type; - if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType)) { + if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } - + uint8_t para2Type; if (3 == numOfParams) { - if (!IS_INTEGER_TYPE(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type)) { + para2Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type; + if (!IS_INTEGER_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } @@ -2587,7 +2588,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - if (3 == numOfParams) { + if (3 == numOfParams && !IS_NULL_TYPE(para2Type)) { int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2)); if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) { return buildFuncErrMsg(pErrBuf, len, code, From b6cf379247fb213e48bc32f1d2b9b2b8b0aef52c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 16:53:44 +0800 Subject: [PATCH 09/24] fix double send-rsp --- source/dnode/mnode/impl/src/mndStream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a35815cf4d..93397e3a8c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -24,6 +24,7 @@ #include "mndVgroup.h" #include "osMemory.h" #include "parser.h" +#include "taoserror.h" #include "tmisce.h" #include "tname.h" @@ -879,6 +880,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + SName dbname = {0}; code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (code) { @@ -3058,4 +3063,4 @@ _err: mDebug("create drop %d orphan tasks trans succ", numOfTasks); } return code; -} \ No newline at end of file +} From bfbe687d2a19f40577bcacf609420b95ea26047d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 17:04:26 +0800 Subject: [PATCH 10/24] fix(query): return directly. --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d72716b141..0ede1cf379 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -375,7 +375,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); pCost->skipBlocks += 1; pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); - QUERY_CHECK_CODE(code, lino, _end); + return code; } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; loadSMA = true; // mark the operation of load sma; From adc583a93653f490a2c26604e860fc9a73d0cfb7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 17:08:58 +0800 Subject: [PATCH 11/24] fix(stream): fix memory leak. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c50c3c484e..07c67ba007 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1051,7 +1051,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } else { // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); continue; } From 69a09e5a0193fca3408308eddfaa5489d2b66eac Mon Sep 17 00:00:00 2001 From: sima Date: Fri, 16 Aug 2024 17:33:25 +0800 Subject: [PATCH 12/24] fix:[TD-31508] Fix wrong data type in week function. --- source/libs/scalar/src/sclfunc.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a4f32356c6..f81205df7a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2668,8 +2668,8 @@ int32_t weekdayFunctionImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam } struct STm tm; TAOS_CHECK_RETURN(taosTs2Tm(timeVal, timePrec, &tm)); - int32_t ret = startFromZero ? (tm.tm.tm_wday + 6) % 7 : tm.tm.tm_wday + 1; - SCL_ERR_RET(colDataSetVal(pOutput->columnData, i, (const char*)&ret, false)); + int64_t ret = startFromZero ? (tm.tm.tm_wday + 6) % 7 : tm.tm.tm_wday + 1; + colDataSetInt64(pOutput->columnData, i, &ret); } pOutput->numOfRows = pInput->numOfRows; @@ -2778,8 +2778,8 @@ int32_t weekFunctionImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } struct STm tm; SCL_ERR_RET(taosTs2Tm(timeVal, prec, &tm)); - int32_t ret = calculateWeekNum(tm.tm, weekMode(mode)); - SCL_ERR_RET(colDataSetVal(pOutput->columnData, i, (const char*)&ret, false)); + int64_t ret = calculateWeekNum(tm.tm, weekMode(mode)); + colDataSetInt64(pOutput->columnData, i, &ret); } pOutput->numOfRows = pInput->numOfRows; From 3e928fc8fc834cc07b03b7c7f1313fa153eebeaf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 18:25:10 +0800 Subject: [PATCH 13/24] remove backend data file --- include/libs/stream/tstream.h | 42 ++++----- source/dnode/vnode/src/tq/tq.c | 139 ++++++++++++++-------------- source/libs/stream/src/streamTask.c | 80 ++++++++++++---- 3 files changed, 150 insertions(+), 111 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e7f2bf0a6..f916e05d52 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -19,7 +19,6 @@ #include "os.h" #include "streamMsg.h" #include "streamState.h" -#include "streamMsg.h" #include "tdatablock.h" #include "tdbInt.h" #include "tmsg.h" @@ -266,14 +265,14 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { - int64_t startTs; - int64_t checkpointId; // latest checkpoint id - int64_t checkpointVer; // latest checkpoint offset in wal - int64_t checkpointTime; // latest checkpoint time - int64_t processedVer; - int64_t nextProcessVer; // current offset in WAL, not serialize it - int64_t msgVer; - int32_t consensusTransId;// consensus checkpoint id + int64_t startTs; + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time + int64_t processedVer; + int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t msgVer; + int32_t consensusTransId; // consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; } SCheckpointInfo; @@ -454,7 +453,8 @@ struct SStreamTask { SSHashObj* pNameMap; void* pBackend; int8_t subtableWithoutMd5; - char reserve[256]; + char* backendPath; + char reserve[256 - sizeof(char*)]; }; typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); @@ -591,9 +591,9 @@ typedef struct STaskStatusEntry { int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - SVersionRange verRange; // start/end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - double inputQUsed; // in MiB + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + double inputQUsed; // in MiB double inputRate; double procsThroughput; // duration between one element put into input queue and being processed. double procsTotal; // duration between one element put into input queue and being processed. @@ -678,9 +678,9 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); // check downstream status -void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); -void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); +void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); // fill-history task int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); @@ -717,8 +717,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) bool streamTaskIsSinkTask(const SStreamTask* pTask); void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); -void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); -void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); +void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); // source level @@ -812,9 +812,9 @@ void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask); -void streamMutexLock(TdThreadMutex *pMutex); -void streamMutexUnlock(TdThreadMutex *pMutex); -void streamMutexDestroy(TdThreadMutex *pMutex); +void streamMutexLock(TdThreadMutex* pMutex); +void streamMutexUnlock(TdThreadMutex* pMutex); +void streamMutexDestroy(TdThreadMutex* pMutex); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 314a6abdf5..a70a04f23d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,7 +14,10 @@ */ #include "tq.h" +#include "osDef.h" +#include "taoserror.h" #include "tqCommon.h" +#include "tstream.h" #include "vnd.h" // 0: not init @@ -153,7 +156,7 @@ void tqNotifyClose(STQ* pTq) { } void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { - int32_t code = 0; + int32_t code = 0; SMqPollReq req = {0}; code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req); if (code < 0) { @@ -169,7 +172,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } dataRsp.common.blockNum = 0; char buf[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -180,15 +183,15 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { tDeleteMqDataRsp(&dataRsp); } -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, - int32_t type, int32_t vgId) { +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type, + int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); char buf1[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); - (void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); + (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); + (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); @@ -200,7 +203,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t SMqVgOffset vgOffset = {0}; int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = 0; + int32_t code = 0; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { @@ -233,12 +236,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } // save the new offset value - if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){ + if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) { + if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, + msgLen - sizeof(vgOffset.consumerId)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -416,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } char buf[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); @@ -447,7 +451,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); STqOffset* pSavedOffset = NULL; - int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); + int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); if (code != 0) { return TSDB_CODE_TMQ_NO_COMMITTED; } @@ -479,7 +483,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { - int32_t code = 0; + int32_t code = 0; SMqPollReq req = {0}; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); @@ -505,7 +509,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, vgId, req.subKey, pHandle->consumerId); taosRUnLockLatch(&pTq->lock); return TSDB_CODE_TMQ_CONSUMER_MISMATCH; - } int64_t sver = 0, ever = 0; @@ -612,8 +615,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqCheckInfo info = {0}; - int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); - if(code != 0){ + int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); + if (code != 0) { return code; } @@ -650,7 +653,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosRLockLatch(&pTq->lock); STqHandle* pHandle = NULL; - (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); //ignore return code + (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); // ignore return code taosRUnLockLatch(&pTq->lock); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -697,7 +700,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } - end: +end: tDecoderClear(&dc); return ret; } @@ -705,7 +708,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { - STQ* pTq = (STQ*) pTqObj; + STQ* pTq = (STQ*)pTqObj; int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); @@ -749,12 +752,12 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - (void) streamSetupScheduleTrigger(pTask); + (void)streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); - char* p = streamTaskGetStatus(pTask).name; + char* p = streamTaskGetStatus(pTask).name; const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); if (pTask->info.fillHistory) { @@ -766,14 +769,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); } else { - tqInfo( - "vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 - " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64 - " ms, inputVer:%" PRId64, - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); + tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x " + "delaySched:%" PRId64 " ms, inputVer:%" PRId64, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } @@ -781,8 +783,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV return 0; } -int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } +int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); @@ -803,13 +804,13 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, - pStep2Range->maxVer, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, + pStep2Range->minVer, pStep2Range->maxVer, 0.0); int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. if (code) { qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); } - (void) streamExecTask(pTask); // exec directly + (void)streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 @@ -830,12 +831,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); - (void) streamTaskSetSchedStatusInactive(pTask); + (void)streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); if (code == TSDB_CODE_SUCCESS) { - (void) tqScanWalAsync(pTq, false); + (void)tqScanWalAsync(pTq, false); } } } @@ -846,9 +847,9 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) { SStreamMeta* pMeta = pStreamTask->pMeta; STaskId hId = pStreamTask->hTaskInfo.id; SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); + int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); if (pTask == NULL) { - tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId); + tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId); return TSDB_CODE_SUCCESS; } @@ -930,8 +931,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) { streamExecScanHistoryInFuture(pTask, retInfo.idleTime); } else { - SStreamTaskState p = streamTaskGetStatus(pTask); - ETaskStatus s = p.state; + SStreamTaskState p = streamTaskGetStatus(pTask); + ETaskStatus s = p.state; if (s == TASK_STATUS__PAUSE) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, @@ -963,7 +964,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); - return code; // todo: handle failure + return code; // todo: handle failure } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -988,7 +989,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // let's continue scan data in the wal files if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { - (void) tqScanWalAsync(pTq, false); // it's ok to failed + (void)tqScanWalAsync(pTq, false); // it's ok to failed } return code; @@ -1026,11 +1027,9 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); } -int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { - return 0; -} +int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { +int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1092,18 +1091,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } if (!pTq->pVnode->restored) { @@ -1111,9 +1110,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode } SStreamTask* pTask = NULL; @@ -1123,7 +1122,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1136,9 +1135,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // todo retry handle error + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // todo retry handle error } // todo save the checkpoint failed info @@ -1154,14 +1153,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } } else { if (status != TASK_STATUS__HALT) { tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); -// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); } } @@ -1178,16 +1177,17 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; - } else { // checkpoint already finished, and not in checkpoint status + } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 - " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); + " transId:%d already handled, return success", + pTask->id.idStr, req.checkpointId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1198,7 +1198,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); if (code) { - qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, + tstrerror(code)); return code; } @@ -1215,7 +1216,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1228,7 +1229,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1249,7 +1250,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1264,9 +1265,7 @@ int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) { } // this function is needed, do not try to remove it. -int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); -} +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f190673430..c531260682 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -14,6 +14,8 @@ */ #include "executor.h" +#include "osDir.h" +#include "osMemory.h" #include "streamInt.h" #include "streamsm.h" #include "tmisce.h" @@ -30,7 +32,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); pTask->info.selfChildId = childId; void* p = taosArrayPush(pArray, &pTask); - return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; + return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { @@ -42,7 +44,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp if (!isEqual) { (*pUpdated) = true; char tmp[512] = {0}; - (void) epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors + (void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors epsetAssign(&pTask->info.epSet, pEpSet); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); @@ -92,7 +94,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { } int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { + SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { *p = NULL; SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -224,17 +226,17 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->schedInfo.pDelayTimer != NULL) { - (void) taosTmrStop(pTask->schedInfo.pDelayTimer); + (void)taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->schedInfo.pDelayTimer = NULL; } if (pTask->hTaskInfo.pTimer != NULL) { - (void) taosTmrStop(pTask->hTaskInfo.pTimer); + (void)taosTmrStop(pTask->hTaskInfo.pTimer); pTask->hTaskInfo.pTimer = NULL; } if (pTask->msgInfo.pRetryTmr != NULL) { - (void) taosTmrStop(pTask->msgInfo.pRetryTmr); + (void)taosTmrStop(pTask->msgInfo.pRetryTmr); pTask->msgInfo.pRetryTmr = NULL; } @@ -321,10 +323,19 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); streamStateClose(pTask->pState, remove); - if (remove)taskDbSetClearFileFlag(pTask->pBackend); + if (remove) taskDbSetClearFileFlag(pTask->pBackend); + taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; + } else { + if (remove) { + if (pTask->backendPath != NULL) { + taosRemoveDir(pTask->backendPath); + taosMemoryFree(pTask->backendPath); + pTask->backendPath = NULL; + } + } } } @@ -364,8 +375,36 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { } } +int32_t streamTaskSetBackendPath(SStreamTask* pTask) { + int64_t streamId = 0; + int32_t taskId = 0; + + if (pTask->info.fillHistory) { + streamId = pTask->hTaskInfo.id.taskId; + taskId = pTask->hTaskInfo.id.taskId; + } else { + streamId = pTask->streamTaskId.taskId; + taskId = pTask->streamTaskId.taskId; + } + + char id[128] = {0}; + int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId); + if (nBytes < 0 || nBytes >= sizeof(id)) { + return TSDB_CODE_OUT_OF_BUFFER; + } + + int32_t len = strlen(pTask->pMeta->path); + pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2); + if (pTask->backendPath == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); + + return 0; +} int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { - (void) createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); + (void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; @@ -459,8 +498,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } if (pTask->chkInfo.pActiveInfo == NULL) { - code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); } + code = streamTaskSetBackendPath(pTask); return code; } @@ -494,12 +534,12 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre } void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo); - return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; + return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - (void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. + (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -510,7 +550,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); epsetAssign(&pInfo->epSet, pEpSet); stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, @@ -545,7 +585,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - (void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. + (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; @@ -564,7 +604,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (!isEqual) { *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); epsetAssign(&pVgInfo->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, @@ -584,7 +624,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); epsetAssign(&pDispatcher->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, @@ -919,7 +959,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta; - int32_t code = 0; + int32_t code = 0; int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); @@ -935,7 +975,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { } void streamTaskPause(SStreamTask* pTask) { - (void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); + (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); } void streamTaskResume(SStreamTask* pTask) { @@ -1142,13 +1182,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; if (pTriggerTmr->tmrHandle != NULL) { - (void) taosTmrStop(pTriggerTmr->tmrHandle); + (void)taosTmrStop(pTriggerTmr->tmrHandle); pTriggerTmr->tmrHandle = NULL; } SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; if (pReadyTmr->tmrHandle != NULL) { - (void) taosTmrStop(pReadyTmr->tmrHandle); + (void)taosTmrStop(pReadyTmr->tmrHandle); pReadyTmr->tmrHandle = NULL; } @@ -1185,4 +1225,4 @@ const char* streamTaskGetExecType(int32_t type) { default: return "invalid-exec-type"; } -} \ No newline at end of file +} From b5dd25a9ad5843b9213e1ddcc1baee9877a2d3e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 19:34:08 +0800 Subject: [PATCH 14/24] fix(query): return directly. --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0ede1cf379..b3655e16f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -392,7 +392,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); - QUERY_CHECK_CODE(code, lino, _end); + return code; } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); *status = FUNC_DATA_REQUIRED_DATA_LOAD; From 6bdbbc4b15adb59dcc1a77c579438b77e1899858 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 19:40:51 +0800 Subject: [PATCH 15/24] fix mem leak --- source/libs/stream/src/streamTask.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c531260682..b7b54e2885 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -298,14 +298,6 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = NULL; - // if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { - // char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128); - // sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr); - // taosRemoveDir(path); - - // stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path); - // taosMemoryFree(path); - // } if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); @@ -328,6 +320,11 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; + + if (pTask->backendPath != NULL) { + taosMemoryFree(pTask->backendPath); + pTask->backendPath = NULL; + } } else { if (remove) { if (pTask->backendPath != NULL) { From 367b6eee729cde15c12df744fec55fd61a4267ac Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 21:01:02 +0800 Subject: [PATCH 16/24] fix mem leak --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamTask.c | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f916e05d52..fd2802058d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -453,8 +453,8 @@ struct SStreamTask { SSHashObj* pNameMap; void* pBackend; int8_t subtableWithoutMd5; + char reserve[256]; char* backendPath; - char reserve[256 - sizeof(char*)]; }; typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b7b54e2885..cee24bb8dc 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -298,7 +298,6 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = NULL; - if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); } @@ -496,10 +495,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i if (pTask->chkInfo.pActiveInfo == NULL) { code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + if (code) { + stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } } - code = streamTaskSetBackendPath(pTask); - return code; + return streamTaskSetBackendPath(pTask); } int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { From 871f4dcb3df010187fb5066c051f57b4af7ab0c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 21:18:22 +0800 Subject: [PATCH 17/24] fix mem leak --- source/libs/stream/src/streamTask.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index cee24bb8dc..ff9688e3d6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -319,7 +319,6 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; - if (pTask->backendPath != NULL) { taosMemoryFree(pTask->backendPath); pTask->backendPath = NULL; From d0beeea91712fe5778f1ee25f3f02f7f38e1ce65 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 16 Aug 2024 21:18:42 +0800 Subject: [PATCH 18/24] fix mem leak --- source/libs/stream/src/streamTask.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ff9688e3d6..fc72489ee6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -315,7 +315,6 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { streamStateClose(pTask->pState, remove); if (remove) taskDbSetClearFileFlag(pTask->pBackend); - taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; From 5b82556ff5d716530a70390d3c4859c8f3b314cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Aug 2024 22:31:21 +0800 Subject: [PATCH 19/24] fix(query): reset the errno code. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 3 ++- source/dnode/vnode/src/tsdb/tsdbRead2.c | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 099bde5897..9938c073ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -833,7 +833,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { int32_t code = 0; int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = NULL; - + terrno = 0; + // no qualified last file block in current file, no need to fetch row if (pIter->pSttBlk == NULL) { return false; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0572df2922..9a9c74a3a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -3598,6 +3598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (pBlockIter->numOfBlocks == 0) { // let's try to extract data from stt files. + terrno = 0; ERetrieveType type = doReadDataFromSttFiles(pReader); if (type == TSDB_READ_RETURN) { return terrno; From 1f6cef26e8f09ea0dc7a16777acec7ac54f85608 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 01:44:58 +0800 Subject: [PATCH 20/24] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 27 +++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 9938c073ff..4729b912a7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -828,16 +828,19 @@ static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) { return code; } -bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { +int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) { int32_t step = pIter->backward ? -1 : 1; int32_t code = 0; int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = NULL; + int32_t lino = 0; + + *hasNext = false; terrno = 0; // no qualified last file block in current file, no need to fetch row if (pIter->pSttBlk == NULL) { - return false; + return code; } code = loadLastBlock(pIter, idStr, &pBlockData); @@ -850,9 +853,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { while (1) { bool skipBlock = false; code = findNextValidRow(pIter, idStr); - if (code) { - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit); if (pIter->pBlockLoadInfo->checkRemainingRow) { skipBlock = true; @@ -902,7 +903,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: - return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); + *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); + return code; } // SMergeTree ================================================= @@ -1005,7 +1007,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF goto _end; } - bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); + bool hasVal = NULL; + code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + if (code) { + goto _end; + } + if (hasVal) { tMergeTreeAddIter(pMTree, pIter); @@ -1018,7 +1025,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF pSttDataInfo->numOfRows += numOfRows; } } else { - TAOS_CHECK_GOTO(terrno, NULL, _end); if (!pMTree->ignoreEarlierTs) { pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; } @@ -1100,8 +1106,9 @@ bool tMergeTreeNext(SMergeTree *pMTree) { if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; - bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); - if (!hasVal) { + bool hasVal = false; + int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + if (!hasVal || (code != 0)) { pMTree->pIter = NULL; } From f4bac239064f146a41ced20982ab93b893f6a30a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 01:47:30 +0800 Subject: [PATCH 21/24] refactor: do some internal refactor --- source/dnode/mnode/impl/src/mndStreamHb.c | 46 ++++++++++++++++------ source/dnode/vnode/src/tqCommon/tqCommon.c | 21 ---------- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamHb.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamTask.c | 2 + 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 59f07ce977..f515e9565d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); -static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); +static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); +static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks); void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); @@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } -void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { +void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); @@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { - mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); - - SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; - void* px = taosArrayPush(pOrphanTasks, &oTask); - if (px == NULL) { - mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); - } + checkforOrphanTask(pMnode, p, pOrphanTasks); continue; } @@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { - mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); + mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s", + p->id.streamId, (int32_t)p->id.taskId, tstrerror(code)); continue; } @@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (code == 0) { mndAddConsensusTasks(pInfo, &cp); } else { - mError("failed to get consensus checkpoint-info"); + mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId); } mndReleaseStream(pMnode, pStream); @@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pFailedChkpt, &info); + addIntoFailedChkptList(pFailedChkpt, &info); // remove failed trans from pChkptStreams code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); @@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { // make sure that the unit test case can work code = mndStreamSendUpdateChkptInfoMsg(pMnode); + if (code) { + mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code)); + } } streamMutexUnlock(&execInfo.lock); @@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_ tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp } + +void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) { + SStreamObj *pStream = NULL; + + int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); + if (code) { + mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list", + p->id.streamId, p->id.taskId); + + SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; + void *px = taosArrayPush(pOrphanTasks, &oTask); + if (px == NULL) { + mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno)); + } + } else { + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet", + p->id.taskId); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7037eb5199..422ca16e50 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } -int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId); - SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { - tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - code = streamTaskSendRestoreChkptMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - // this is to process request from transaction, always return true. int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9be8f5ffaa..0ef7c2312a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { return code; } -int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { +int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index d2c5cb05b7..73392fade0 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { if ((*pTask)->status.requireConsensusChkptId) { entry.checkpointInfo.consensusChkptId = 1; (*pTask)->status.requireConsensusChkptId = false; - stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 07c67ba007..5bec930455 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1424,7 +1424,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } // negotiate the consensus checkpoint id for current task - code = streamTaskSendRestoreChkptMsg(pTask); + code = streamTaskSendNegotiateChkptIdMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f190673430..5628095973 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1182,6 +1182,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; + case 0: + return "exec-all-tasks"; default: return "invalid-exec-type"; } From b6c3e7574696482615d76a55ca5467c1b230c81e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 17 Aug 2024 09:47:43 +0800 Subject: [PATCH 22/24] fix mem leak --- source/libs/stream/src/streamTask.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fc72489ee6..1cbbcac046 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -318,19 +318,18 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; - if (pTask->backendPath != NULL) { - taosMemoryFree(pTask->backendPath); - pTask->backendPath = NULL; - } } else { if (remove) { if (pTask->backendPath != NULL) { taosRemoveDir(pTask->backendPath); - taosMemoryFree(pTask->backendPath); - pTask->backendPath = NULL; } } } + + if (pTask->backendPath != NULL) { + taosMemoryFree(pTask->backendPath); + pTask->backendPath = NULL; + } } static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { From 0d3d0730d4cf697abdb90999e30330dc4f848eba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 15:30:10 +0800 Subject: [PATCH 23/24] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +- source/libs/stream/src/streamMeta.c | 402 +------------------- source/libs/stream/src/streamStartTask.c | 444 +++++++++++++++++++++++ 3 files changed, 452 insertions(+), 399 deletions(-) create mode 100644 source/libs/stream/src/streamStartTask.c diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e7f2bf0a6..8d6184cab6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -750,6 +750,9 @@ void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); +int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo); +void streamMetaClearStartInfo(STaskStartInfo* pStartInfo); + int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); @@ -770,7 +773,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); -int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); +int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5bec930455..a9976760b6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -37,12 +37,6 @@ typedef struct { SHashObj* pTable; } SMetaRefMgt; -typedef struct STaskInitTs { - int64_t start; - int64_t end; - bool success; -} STaskInitTs; - SMetaRefMgt gMetaRefMgt; int32_t metaRefMgtInit(); @@ -405,15 +399,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, goto _err; } - pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->startInfo.pReadyTaskSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); - if (pMeta->startInfo.pFailedTaskSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = streamMetaInitStartInfo(&pMeta->startInfo); + if (code) { goto _err; } @@ -609,8 +596,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->updateInfo.pTasks); - taosHashCleanup(pMeta->startInfo.pReadyTaskSet); - taosHashCleanup(pMeta->startInfo.pFailedTaskSet); + + streamMetaClearStartInfo(&pMeta->startInfo); destroyMetaHbInfo(pMeta->pHbInfo); pMeta->pHbInfo = NULL; @@ -1191,18 +1178,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) { streamMetaHbToMnode(pRid, NULL); } -void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { - taosHashClear(pStartInfo->pReadyTaskSet); - taosHashClear(pStartInfo->pFailedTaskSet); - pStartInfo->tasksWillRestart = 0; - pStartInfo->readyTs = 0; - pStartInfo->elapsedTime = 0; - - // reset the sentinel flag value to be 0 - pStartInfo->startAllTasks = 0; - stDebug("vgId:%d clear start-all-task info", vgId); -} - void streamMetaRLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-rlock", pMeta->vgId); (void)taosThreadRwlockRdlock(&pMeta->lock); @@ -1302,185 +1277,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { - streamMetaWLock(pMeta); - - if (pMeta->closeFlag) { - streamMetaWUnLock(pMeta); - stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); - return TSDB_CODE_FAILED; - } - - *pList = taosArrayDup(pMeta->pTaskList, NULL); - if (*pList == NULL) { - return terrno; - } - - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = now; - - int32_t code = streamMetaResetTaskStatus(pMeta); - streamMetaWUnLock(pMeta); - - return code; -} - -// restore the checkpoint id by negotiating the latest consensus checkpoint id -int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; - int64_t now = taosGetTimestampMs(); - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); - - if (numOfTasks == 0) { - stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); - return TSDB_CODE_SUCCESS; - } - - SArray* pTaskList = NULL; - code = prepareBeforeStartTasks(pMeta, &pTaskList, now); - if (code != TSDB_CODE_SUCCESS) { - ASSERT(pTaskList == NULL); - return TSDB_CODE_SUCCESS; - } - - // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. - numOfTasks = taosArrayGetSize(pTaskList); - - // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization, when the operation of check downstream tasks status is executed far quickly. - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); - continue; - } - - if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { - code = pMeta->expandTaskFn(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); - } - } - - streamMetaReleaseTask(pMeta, pTask); - } - - // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - - SStreamTask* pTask = NULL; - code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); - continue; - } - - STaskExecStatisInfo* pInfo = &pTask->execInfo; - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - // ready now, start the related fill-history task - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? - } - - (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, - true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (ret != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - code = ret; - - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } - - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - // negotiate the consensus checkpoint id for current task - code = streamTaskSendNegotiateChkptIdMsg(pTask); - - // this task may has no checkpoint, but others tasks may generate checkpoint already? - streamMetaReleaseTask(pMeta, pTask); - } - - // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without - // initialization, when the operation of check downstream tasks status is executed far quickly. - stInfo("vgId:%d start all task(s) completed", pMeta->vgId); - taosArrayDestroy(pTaskList); - return code; -} - -int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { - streamMetaRLock(pMeta); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); - if (num == 0) { - stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); - streamMetaRUnLock(pMeta); - return TSDB_CODE_SUCCESS; - } - - int64_t st = taosGetTimestampMs(); - - // send hb msg to mnode before closing all tasks. - SArray* pTaskList = NULL; - int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int32_t numOfTasks = taosArrayGetSize(pTaskList); - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = NULL; - - code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - (void)streamTaskStop(pTask); - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); - - streamMetaRUnLock(pMeta); - return 0; -} - bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { @@ -1499,196 +1295,6 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { return true; } -int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; - SStreamTask* pTask = NULL; - bool continueExec = true; - - stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); - - code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); - if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); - (void)streamMetaAddFailedTask(pMeta, streamId, taskId); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - // fill-history task can only be launched by related stream tasks. - STaskExecStatisInfo* pInfo = &pTask->execInfo; - if (pTask->info.fillHistory == 1) { - stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; - } - - // the start all tasks procedure may happen to start the newly deployed stream task, and results in the - // concurrently start this task by two threads. - streamMutexLock(&pTask->lock); - SStreamTaskState status = streamTaskGetStatus(pTask); - if (status.state != TASK_STATUS__UNINIT) { - stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); - continueExec = false; - } else { - continueExec = true; - } - streamMutexUnlock(&pTask->lock); - - if (!continueExec) { - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - ASSERT(pTask->status.downstreamReady == 0); - - // avoid initialization and destroy running concurrently. - streamMutexLock(&pTask->lock); - if (pTask->pBackend == NULL) { - code = pMeta->expandTaskFn(pTask); - streamMutexUnlock(&pTask->lock); - - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } else { - streamMutexUnlock(&pTask->lock); - } - - // concurrently start task may cause the later started task be failed, and also failed to added into meta result. - if (code == TSDB_CODE_SUCCESS) { - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, - tstrerror(code)); - - // do no added into result hashmap if it is failed due to concurrently starting of this stream task. - if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - } - } - } - - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { - int32_t vgId = pMeta->vgId; - void* pIter = NULL; - size_t keyLen = 0; - - stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), - succ ? "success" : "failed"); - - while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { - STaskInitTs* pInfo = pIter; - void* key = taosHashGetKey(pIter, &keyLen); - - SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - if (pTask1 == NULL) { - stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); - } else { - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); - } - } -} - -// check all existed tasks are received rsp -static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { - for (int32_t i = 0; i < numOfTotal; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (pTaskId == NULL) { - continue; - } - - STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx)); - if (px == NULL) { - px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); - if (px == NULL) { - return false; - } - } - } - - return true; -} - -int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, - int64_t endTs, bool ready) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - STaskId id = {.streamId = streamId, .taskId = taskId}; - int32_t vgId = pMeta->vgId; - bool allRsp = true; - - streamMetaWLock(pMeta); - SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); - streamMetaWUnLock(pMeta); - return 0; - } - - // clear the send consensus-checkpointId flag - streamMutexLock(&(*p)->lock); - (*p)->status.sendConsensusChkptId = false; - streamMutexUnlock(&(*p)->lock); - - if (pStartInfo->startAllTasks != 1) { - int64_t el = endTs - startTs; - stDebug( - "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " - "time:%" PRId64 "ms", - vgId, taskId, ready, el); - streamMetaWUnLock(pMeta); - return 0; - } - - STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; - SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); - if (code) { - if (code == TSDB_CODE_DUP_KEY) { - stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 - " already exist start results in meta start task result hashmap", - vgId, id.taskId); - } else { - stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); - } - streamMetaWUnLock(pMeta); - return code; - } - - int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - - allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); - if (allRsp) { - pStartInfo->readyTs = taosGetTimestampMs(); - pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", - vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, - pStartInfo->elapsedTime / 1000.0); - - // print the initialization elapsed time and info - displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); - displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo, vgId); - streamMetaWUnLock(pMeta); - - code = pStartInfo->completeFn(pMeta); - } else { - streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, - numOfRecv, numOfTotal); - } - - return code; -} - int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c new file mode 100644 index 0000000000..3cf06fd04a --- /dev/null +++ b/source/libs/stream/src/streamStartTask.c @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamBackendRocksdb.h" +#include "streamInt.h" +#include "tmisce.h" +#include "tref.h" +#include "tsched.h" +#include "tstream.h" +#include "ttimer.h" +#include "wal.h" + +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + +static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now); +static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); + +// restore the checkpoint id by negotiating the latest consensus checkpoint id +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; + int64_t now = taosGetTimestampMs(); + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); + + if (numOfTasks == 0) { + stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + code = prepareBeforeStartTasks(pMeta, &pTaskList, now); + if (code != TSDB_CODE_SUCCESS) { + ASSERT(pTaskList == NULL); + return TSDB_CODE_SUCCESS; + } + + // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. + numOfTasks = taosArrayGetSize(pTaskList); + + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = NULL; + code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + continue; + } + + if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + code = pMeta->expandTaskFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + } + } + + streamMetaReleaseTask(pMeta, pTask); + } + + // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here. + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + + SStreamTask* pTask = NULL; + code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + (void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + continue; + } + + STaskExecStatisInfo* pInfo = &pTask->execInfo; + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // ready now, start the related fill-history task + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + (void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task? + } + + (void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, + true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (ret != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); + code = ret; + + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } + + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + // negotiate the consensus checkpoint id for current task + code = streamTaskSendNegotiateChkptIdMsg(pTask); + + // this task may has no checkpoint, but others tasks may generate checkpoint already? + streamMetaReleaseTask(pMeta, pTask); + } + + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization, when the operation of check downstream tasks status is executed far quickly. + stInfo("vgId:%d start all task(s) completed", pMeta->vgId); + taosArrayDestroy(pTaskList); + return code; +} + +int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { + streamMetaWLock(pMeta); + + if (pMeta->closeFlag) { + streamMetaWUnLock(pMeta); + stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); + return TSDB_CODE_FAILED; + } + + *pList = taosArrayDup(pMeta->pTaskList, NULL); + if (*pList == NULL) { + return terrno; + } + + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = now; + + int32_t code = streamMetaResetTaskStatus(pMeta); + streamMetaWUnLock(pMeta); + + return code; +} + +void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { + taosHashClear(pStartInfo->pReadyTaskSet); + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->tasksWillRestart = 0; + pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; + + // reset the sentinel flag value to be 0 + pStartInfo->startAllTasks = 0; + stDebug("vgId:%d clear start-all-task info", vgId); +} + +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; + int32_t vgId = pMeta->vgId; + bool allRsp = true; + + streamMetaWLock(pMeta); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); + streamMetaWUnLock(pMeta); + return 0; + } + + // clear the send consensus-checkpointId flag + streamMutexLock(&(*p)->lock); + (*p)->status.sendConsensusChkptId = false; + streamMutexUnlock(&(*p)->lock); + + if (pStartInfo->startAllTasks != 1) { + int64_t el = endTs - startTs; + stDebug( + "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " + "time:%" PRId64 "ms", + vgId, taskId, ready, el); + streamMetaWUnLock(pMeta); + return 0; + } + + STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; + int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + if (code) { + if (code == TSDB_CODE_DUP_KEY) { + stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 + " already exist start results in meta start task result hashmap", + vgId, id.taskId); + } else { + stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); + } + streamMetaWUnLock(pMeta); + return code; + } + + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); + + allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); + if (allRsp) { + pStartInfo->readyTs = taosGetTimestampMs(); + pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; + + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + streamMetaResetStartInfo(pStartInfo, vgId); + streamMetaWUnLock(pMeta); + + code = pStartInfo->completeFn(pMeta); + } else { + streamMetaWUnLock(pMeta); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, + numOfRecv, numOfTotal); + } + + return code; +} + +// check all existed tasks are received rsp +bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { + for (int32_t i = 0; i < numOfTotal; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (pTaskId == NULL) { + continue; + } + + STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + return false; + } + } + } + + return true; +} + +void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { + int32_t vgId = pMeta->vgId; + void* pIter = NULL; + size_t keyLen = 0; + + stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet), + succ ? "success" : "failed"); + + while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { + STaskInitTs* pInfo = pIter; + void* key = taosHashGetKey(pIter, &keyLen); + + SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); + if (pTask1 == NULL) { + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + } else { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } + } +} + +int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) { + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pStartInfo->pReadyTaskSet == NULL) { + return terrno; + } + + pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); + if (pStartInfo->pFailedTaskSet == NULL) { + return terrno; + } + + return 0; +} + +void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) { + taosHashClear(pStartInfo->pReadyTaskSet); + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; + pStartInfo->startTs = 0; + pStartInfo->startAllTasks = 0; + pStartInfo->tasksWillRestart = 0; + pStartInfo->restartCount = 0; +} + +int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; + bool continueExec = true; + + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); + + code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); + (void)streamMetaAddFailedTask(pMeta, streamId, taskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // fill-history task can only be launched by related stream tasks. + STaskExecStatisInfo* pInfo = &pTask->execInfo; + if (pTask->info.fillHistory == 1) { + stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + // the start all tasks procedure may happen to start the newly deployed stream task, and results in the + // concurrently start this task by two threads. + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__UNINIT) { + stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); + continueExec = false; + } else { + continueExec = true; + } + streamMutexUnlock(&pTask->lock); + + if (!continueExec) { + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + ASSERT(pTask->status.downstreamReady == 0); + + // avoid initialization and destroy running concurrently. + streamMutexLock(&pTask->lock); + if (pTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pTask); + streamMutexUnlock(&pTask->lock); + + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } else { + streamMutexUnlock(&pTask->lock); + } + + // concurrently start task may cause the later started task be failed, and also failed to added into meta result. + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + tstrerror(code)); + + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } + } + } + + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { + stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; + } + + int64_t st = taosGetTimestampMs(); + + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = NULL; + int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t numOfTasks = taosArrayGetSize(pTaskList); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = NULL; + + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + (void)streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); + + streamMetaRUnLock(pMeta); + return 0; +} + + From 89eaf01621163256fbfa551317808772c123ecf2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Aug 2024 23:23:18 +0800 Subject: [PATCH 24/24] fix(stream):fix memory leak. --- source/libs/stream/src/streamStartTask.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 3cf06fd04a..99f4e84951 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -314,8 +314,8 @@ int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) { - taosHashClear(pStartInfo->pReadyTaskSet); - taosHashClear(pStartInfo->pFailedTaskSet); + taosHashCleanup(pStartInfo->pReadyTaskSet); + taosHashCleanup(pStartInfo->pFailedTaskSet); pStartInfo->readyTs = 0; pStartInfo->elapsedTime = 0; pStartInfo->startTs = 0;