From 9728518db030fd556f7ef12a692bf2b5a935e363 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 22 Jan 2024 15:18:49 +0800 Subject: [PATCH 01/12] feat: extract rows within limit before sort to disk --- source/libs/executor/src/tsort.c | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..24df12d06b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { + int64_t keepRows = pOrigBlk->info.rows; + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + if (nRows >= pHandle->mergeLimit) { + keepRows = pHandle->mergeLimit - prevRows; + } + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + } else { + pBlock = createOneDataBlock(pOrigBlk, true); + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + if (pBlk != NULL && pHandle->mergeLimit != -1) { + pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; } From 3f441bb8cfa9d2652fe86cf25c48c24930a6c940 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 22 Jan 2024 15:18:49 +0800 Subject: [PATCH 02/12] feat: extract rows within limit before sort to disk --- source/libs/executor/src/tsort.c | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..24df12d06b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { + int64_t keepRows = pOrigBlk->info.rows; + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + if (nRows >= pHandle->mergeLimit) { + keepRows = pHandle->mergeLimit - prevRows; + } + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + } else { + pBlock = createOneDataBlock(pOrigBlk, true); + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + if (pBlk != NULL && pHandle->mergeLimit != -1) { + pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; } From 6ca92a3d925ec5f364969b4c2df0f5f2cb5b22ed Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 08:41:59 +0800 Subject: [PATCH 03/12] fix: meory leak --- source/libs/executor/src/tsort.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 24df12d06b..3e8d1628aa 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,7 +1040,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } -static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { +static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { int64_t keepRows = pOrigBlk->info.rows; int64_t nRows = 0; int64_t prevRows = 0; @@ -1061,8 +1061,9 @@ static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* m SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + *pExtractedBlock = true; } else { - pBlock = createOneDataBlock(pOrigBlk, true); + *pExtractedBlock = false; } return pBlock; } @@ -1096,8 +1097,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); int64_t p = taosGetTimestampUs(); + bool bExtractedBlock = false; if (pBlk != NULL && pHandle->mergeLimit != -1) { - pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); } if (pBlk != NULL) { @@ -1116,8 +1118,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); blockDataMerge(tBlk, pBlk); + if (bExtractedBlock) { + blockDataDestroy(pBlk); + } } else { - SSDataBlock* tBlk = createOneDataBlock(pBlk, true); + SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); taosArrayPush(aBlkSort, &tBlk); } From 5c9edce538b8f5d36e22869802542d260c98c56f Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 10:45:26 +0800 Subject: [PATCH 04/12] fix: whole block error --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 6 ++++-- tests/script/tsim/parser/limit_stb.sim | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..808956ff5c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3660,7 +3660,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* terrno = TSDB_CODE_TSC_QUERY_CANCELLED; T_LONG_JMP(pOperator->pTaskInfo->env, terrno); } - + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3e8d1628aa..38aac39d8e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -885,7 +885,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); ASSERT(size <= getBufPageSize(pHandle->pBuf)); - + blockDataToBuf(pPage, blk); setBufPageDirty(pPage, true); @@ -1041,7 +1041,6 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { - int64_t keepRows = pOrigBlk->info.rows; int64_t nRows = 0; int64_t prevRows = 0; void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); @@ -1055,15 +1054,18 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH nRows = *(int64_t*)pNum; } + int64_t keepRows = pOrigBlk->info.rows; if (nRows >= pHandle->mergeLimit) { keepRows = pHandle->mergeLimit - prevRows; } + SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); *pExtractedBlock = true; } else { *pExtractedBlock = false; + pBlock = pOrigBlk; } return pBlock; } diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 7d6aff3b51..2e8f029260 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -129,6 +129,7 @@ endi $offset = $tbNum * $rowNum $offset = $offset - 1 +print select * from $stb order by ts limit 2 offset $offset sql select * from $stb order by ts limit 2 offset $offset if $rows != 1 then return -1 From cc45d7a6f3195d7651668323c56c9cb549d09efc Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 23 Jan 2024 15:35:27 +0800 Subject: [PATCH 05/12] fix: tcache conn obj ref count not released --- source/util/src/tcache.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 11f8df4c93..0a7842194e 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -994,6 +994,12 @@ void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) { } void taosCacheDestroyIter(SCacheIter *pIter) { + for (int32_t i = 0; i < pIter->numOfObj; ++i) { + if (!pIter->pCurrent[i]) continue; + char *p = pIter->pCurrent[i]->data; + taosCacheRelease(pIter->pCacheObj, (void **)&p, false); + pIter->pCurrent[i] = NULL; + } taosMemoryFreeClear(pIter->pCurrent); taosMemoryFreeClear(pIter); } From 70f869ce33f30eeeb446219f2979204f9c968c5d Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 23 Jan 2024 17:05:03 +0800 Subject: [PATCH 06/12] fix: memory error when limit 0 --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 38aac39d8e..64f47baca9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1100,7 +1100,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); bool bExtractedBlock = false; - if (pBlk != NULL && pHandle->mergeLimit != -1) { + if (pBlk != NULL && pHandle->mergeLimit > 0) { pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); } From 57a9ac75a8644510e17f9d3359f8ef54ba2f35f2 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 11:37:47 +0800 Subject: [PATCH 07/12] feat: remove limit reached from merge scan operator --- source/libs/executor/inc/tsort.h | 4 +++ source/libs/executor/src/scanoperator.c | 34 +++++++------------------ source/libs/executor/src/tsort.c | 11 ++++++++ 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 365acf2bff..436d1cefb8 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo */ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple); +/** + * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param +*/ +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 808956ff5c..18ab5fc17d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3324,26 +3324,16 @@ _error: return NULL; } -static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { - int64_t nRows = 0; - void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); - if (pNum == NULL) { - nRows = pBlock->info.rows; - tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); - } else { - *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; - nRows = *(int64_t*)pNum; - } - - if (nRows >= pInfo->mergeLimit) { - if (pInfo->mSkipTables == NULL) { +static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { + STableMergeScanInfo* pInfo = pTableMergeScanInfo; + if (pInfo->mSkipTables == NULL) { pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - } - int bSkip = 1; - taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); } - return TSDB_CODE_SUCCESS; + int bSkip = 1; + if (pInfo->mSkipTables != NULL) { + taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip)); + } } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { @@ -3459,10 +3449,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); - if (pInfo->mergeLimit != -1) { - tableMergeScanDoSkipTable(pInfo, pBlock); - } - pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; return pBlock; @@ -3529,6 +3515,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); + tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); @@ -3756,8 +3743,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tSimpleHashCleanup(pTableScanInfo->mTableNumRows); - pTableScanInfo->mTableNumRows = NULL; taosHashCleanup(pTableScanInfo->mSkipTables); pTableScanInfo->mSkipTables = NULL; destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); @@ -3849,8 +3834,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - pInfo->mTableNumRows = tSimpleHashInit(1024, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + pInfo->mergeLimit = -1; bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; if (hasLimit) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 64f47baca9..db9266cb8f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -75,6 +75,9 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; + + void (*mergeLimitReachedFn)(uint64_t tableUid, void* param); + void* mergeLimitReachedParam; }; void tsortSetSingleTableMerge(SSortHandle* pHandle) { @@ -1056,6 +1059,9 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH int64_t keepRows = pOrigBlk->info.rows; if (nRows >= pHandle->mergeLimit) { + if (pHandle->mergeLimitReachedFn) { + pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam); + } keepRows = pHandle->mergeLimit - prevRows; } @@ -1651,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke } return ret; } + +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) { + pHandle->mergeLimitReachedFn = mergeLimitReachedCb; + pHandle->mergeLimitReachedParam = param; +} From ca6b9f959a1f5a3f3cdecb0f9331aac8ec73370c Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 25 Jan 2024 17:02:08 +0800 Subject: [PATCH 08/12] fix: printSlowLog heap over flow --- source/util/src/tlog.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index bd6c37a7b5..505ce61eca 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -573,6 +573,9 @@ void taosPrintSlowLog(const char *format, ...) { len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer); va_end(argpointer); + if (len < 0 || len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2) { + len = LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2; + } buffer[len++] = '\n'; buffer[len] = 0; From 82a27331564c29baebdc518c409a6cdcd1874847 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Fri, 26 Jan 2024 15:50:33 +0800 Subject: [PATCH 09/12] fix: update python connector version --- Jenkinsfile2 | 2 +- tests/parallel_test/run_case.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d3fc05a1d2..e9372ab686 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -306,7 +306,7 @@ def pre_test_build_win() { cd %WIN_CONNECTOR_ROOT% python.exe -m pip install --upgrade pip python -m pip uninstall taospy -y - python -m pip install taospy==2.7.12 + python -m pip install taospy==2.7.13 python -m pip uninstall taos-ws-py -y python -m pip install taos-ws-py==0.3.1 xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 7c80ecdbb7..6b99e7a54e 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so #define taospy 2.7.10 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install --default-timeout=120 taospy==2.7.12 +pip3 install --default-timeout=120 taospy==2.7.13 #define taos-ws-py 0.3.1 pip3 list|grep taos-ws-py From 54a00b3c219eb71d4595bcab2ab08ceeb184d1b7 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Fri, 26 Jan 2024 15:52:32 +0800 Subject: [PATCH 10/12] fix: update python connector version --- tests/parallel_test/run_case.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 6b99e7a54e..7429c9976b 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so #define taospy 2.7.10 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install --default-timeout=120 taospy==2.7.13 +pip3 install --default-timeout=120 taospy==2.7.13 #define taos-ws-py 0.3.1 pip3 list|grep taos-ws-py From f86a8248d2b1589179232b452c33b78ef080c48b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 29 Jan 2024 10:19:01 +0800 Subject: [PATCH 11/12] fix: remove stmt assert --- source/client/src/clientStmt.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 8ac9550aca..36a3e50aef 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { if (pStmt->bInfo.inExecCache) { - if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) { - tscError("stmtGetFromCache error"); - return TSDB_CODE_TSC_STMT_CACHE_ERROR; - } pStmt->bInfo.needParse = false; tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName); return TSDB_CODE_SUCCESS; From 9087a0d9d049c2808d353b92e1d9499c2358ce90 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Jan 2024 11:03:50 +0800 Subject: [PATCH 12/12] feat(stream): drop orphan tasks. --- include/common/tmsg.h | 2 +- source/dnode/mnode/impl/inc/mndStream.h | 16 +- source/dnode/mnode/impl/src/mndSma.c | 4 +- source/dnode/mnode/impl/src/mndStream.c | 48 +----- source/dnode/mnode/impl/src/mndStreamHb.c | 84 ++++++++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 171 ++++++++++++++++++-- 6 files changed, 237 insertions(+), 88 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c314d82036..c4da3194e0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3323,7 +3323,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq; +} SVPauseStreamTaskReq, SVResetStreamTaskReq; typedef struct { int8_t reserved; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 92035101f6..d884227249 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -69,12 +69,6 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SFailedCheckpointInfo { - int64_t streamUid; - int64_t checkpointId; - int32_t transId; -} SFailedCheckpointInfo; - #define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" #define MND_STREAM_PAUSE_NAME "stream-pause" @@ -97,9 +91,14 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId) bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); +typedef struct SOrphanTask { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SOrphanTask; + // for sma // TODO refactor -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); @@ -119,7 +118,8 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) int32_t initStreamNodeList(SMnode *pMnode); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - +int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index a89136e7d3..e6027a0332 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p sdbRelease(pMnode->pSdb, pStream); goto _OVER; } else { - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); sdbRelease(pMnode->pSdb, pStream); goto _OVER; @@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p SStreamObj *pStream = mndAcquireStream(pMnode, streamName); if (pStream != NULL && pStream->smaId == pSma->uid) { - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); mndReleaseStream(pMnode, pStream); goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7b348172f2..46c7a06079 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -608,50 +608,6 @@ _OVER: return -1; } -static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction - terrno = code; - return -1; - } - - // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); - if (code != 0) { - taosMemoryFree(pReq); - return -1; - } - - return 0; -} - -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - int32_t lv = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < lv; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) { - return -1; - } - } - } - return 0; -} - static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; @@ -1200,7 +1156,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); // drop all tasks - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1264,7 +1220,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return -1; } else { #if 0 - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); sdbRelease(pMnode->pSdb, pStream); sdbCancelFetch(pSdb, pIter); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index e4599edbd4..5a6faadebb 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,6 +16,12 @@ #include "mndStream.h" #include "mndTrans.h" +typedef struct SFailedCheckpointInfo { + int64_t streamUid; + int64_t checkpointId; + int32_t transId; +} SFailedCheckpointInfo; + static void doExtractTasksFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -177,10 +183,51 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { return TSDB_CODE_SUCCESS; } +static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) { + SOrphanTask* pTask = taosArrayGet(pList, 0); + + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); + if (conflict) { + return -1; + } + + SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; + STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream"); + if (pTrans == NULL) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + return -1; + } + + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); + + // drop all tasks + if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mndTransDrop(pTrans); + return -1; + } + + // drop stream + if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) { + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + return 0; +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); @@ -198,8 +245,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); // extract stream task list - int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); - if (numOfExisted == 0) { + if (taosHashGetSize(execInfo.pTaskMap) == 0) { doExtractTasksFromStream(pMnode); } @@ -218,6 +264,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); + + SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; + taosArrayPush(pOrphanTasks, &oTask); continue; } @@ -240,15 +289,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } streamTaskStatusCopy(pTaskEntry, p); - if (p->checkpointId != 0) { - if (p->checkpointFailed) { - mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); + if ((p->checkpointId != 0) && p->checkpointFailed) { + mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, + p->checkpointId, p->chkpointTransId); - SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pList, &info); - } + SFailedCheckpointInfo info = { + .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + addIntoCheckpointList(pFailedTasks, &info); } } @@ -266,15 +313,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal - if (taosArrayGetSize(pList) > 0) { + if (taosArrayGetSize(pFailedTasks) > 0) { bool allReady = true; SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i); + for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); @@ -285,9 +332,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } + // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors. + if (taosArrayGetSize(pOrphanTasks) > 0) { + mndDropOrphanTasks(pMnode, pOrphanTasks); + } + taosThreadMutexUnlock(&execInfo.lock); streamMetaClearHbMsg(&req); - taosArrayDestroy(pList); + taosArrayDestroy(pFailedTasks); + taosArrayDestroy(pOrphanTasks); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 2ee73528e0..5d6e34856b 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -18,6 +18,66 @@ #include "tmisce.h" #include "mndVgroup.h" +typedef struct SStreamTaskIter { + SStreamObj *pStream; + int32_t level; + int32_t ordinalIndex; + int32_t totalLevel; + SStreamTask *pTask; +} SStreamTaskIter; + +SStreamTaskIter* createTaskIter(SStreamObj* pStream) { + SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->level = -1; + pIter->ordinalIndex = 0; + pIter->pStream = pStream; + pIter->totalLevel = taosArrayGetSize(pStream->tasks); + pIter->pTask = NULL; + + return pIter; +} + +bool taskIterNextTask(SStreamTaskIter* pIter) { + if (pIter->level >= pIter->totalLevel) { + pIter->pTask = NULL; + return false; + } + + if (pIter->level == -1) { + pIter->level += 1; + } + + while(pIter->level < pIter->totalLevel) { + SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level); + if (pIter->ordinalIndex >= taosArrayGetSize(pList)) { + pIter->level += 1; + pIter->ordinalIndex = 0; + pIter->pTask = NULL; + continue; + } + + pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex); + pIter->ordinalIndex += 1; + return true; + } + + pIter->pTask = NULL; + return false; +} + +SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) { + return pIter->pTask; +} + +void destroyTaskIter(SStreamTaskIter* pIter) { + taosMemoryFree(pIter); +} + SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -251,24 +311,103 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SArray *tasks = pStream->tasks; + SStreamTaskIter *pIter = createTaskIter(pStream); - int32_t size = taosArrayGetSize(tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - - if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { - return -1; - } - - if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { + destroyTaskIter(pIter); + return -1; } + + if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { + atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + } + } + + destroyTaskIter(pIter); + return 0; +} + +static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction + terrno = code; + return -1; + } + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + if (code != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SStreamTaskIter *pIter = createTaskIter(pStream); + + while(taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (doSetDropAction(pMnode, pTrans, pTask) < 0) { + destroyTaskIter(pIter); + return -1; + } + } + destroyTaskIter(pIter); + return 0; +} + +static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->taskId; + pReq->streamId = pTask->streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId); + if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction + terrno = code; + taosMemoryFree(pReq); + return -1; + } + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + if (code != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SOrphanTask* pTask = taosArrayGet(pList, i); + mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); + doSetDropActionFromId(pMnode, pTrans, pTask); } return 0; } \ No newline at end of file