From 2b78b660bdbdab53e1e39cdd4664d2087bc048d1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Sep 2024 18:19:47 +0800 Subject: [PATCH 01/11] refactor: update the error logs. --- source/libs/executor/src/executor.c | 3 +- source/libs/executor/src/executorInt.c | 7 +++++ source/libs/executor/src/mergeoperator.c | 4 +++ source/libs/executor/src/operator.c | 1 + source/libs/executor/src/scanoperator.c | 13 ++++---- source/libs/executor/src/sortoperator.c | 40 +++++++++++++----------- source/libs/executor/src/tsort.c | 6 ++-- 7 files changed, 45 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cd43c5c99e..f57eb9d64b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); - int32_t lino = 0; int64_t curOwner = 0; *pRes = NULL; @@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes); if (code) { pTaskInfo->code = code; - qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo)); } blockDataCheck(*pRes, false); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 1804f0ce26..33e7e2f981 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1297,10 +1297,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM); pOperator->pDownstreamGetParams[idx] = NULL; } + + if (code) { + qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code)); + } return code; } code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock); + if (code) { + qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 49973ac373..64a0857d67 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); blockDataCheck(*ppBlock, false); + if (code) { + qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); + } return code; } @@ -518,6 +521,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) { code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock); if (code) { + qError("failed to get next data block from upstream, code:%s", tstrerror(code)); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index fe2f3f8dfe..90031685a8 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -889,6 +889,7 @@ int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* } else { code = pOperator->fpSet.getNextFn(pOperator, pRes); if (code) { + qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); pOperator->pTaskInfo->code = code; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b6b5c5484e..345812eec9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1378,8 +1378,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); if (code != TSDB_CODE_SUCCESS) { taosRUnLockLatch(&pTaskInfo->lock); - lino = __LINE__; - goto _end; + TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->currentTable >= numOfTables) { @@ -1391,11 +1390,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); if (!tmp) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); taosRUnLockLatch(&pTaskInfo->lock); (*ppRes) = NULL; - return terrno; + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } + tInfo = *tmp; taosRUnLockLatch(&pTaskInfo->lock); @@ -1410,11 +1409,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { } } else { // scan table group by group sequentially code = groupSeqTableScan(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); } _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code)); pTaskInfo->code = code; } @@ -5820,9 +5820,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STupleHandle* pTupleHandle = NULL; blockDataCleanup(pResBlock); - STupleHandle* pTupleHandle = NULL; + while (1) { while (1) { pTupleHandle = NULL; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 27ae5e7281..9ec63eca4e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -204,16 +204,17 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys * @param [in, out] pBlock the output block, the group id will be saved in it * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple - * @retval NULL if no more tuples */ -static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) { - int32_t code = 0; +static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock, + STupleHandle** pTupleHandle) { + QRY_PARAM_CHECK(pTupleHandle); + + int32_t code = 0; STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple; if (!retTuple) { code = tsortNextTuple(pHandle, &retTuple); - if (code) { - return NULL; - } + qError("failed to get next tuple, code:%s", tstrerror(code)); + return code; } if (retTuple) { @@ -225,7 +226,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf, &pInfo->pGroupIdCalc->lastKeysLen, retTuple); } - bool emptyBlock = pBlock->info.rows == 0; + + bool emptyBlock = (pBlock->info.rows == 0); if (newGroup) { if (!emptyBlock) { // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return @@ -247,17 +249,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf } } - return retTuple; + *pTupleHandle = retTuple; + return code; } static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); blockDataCleanup(pDataBlock); - int32_t lino = 0; - int32_t code = 0; - SSDataBlock* p = NULL; + int32_t lino = 0; + int32_t code = 0; + STupleHandle* pTupleHandle = NULL; + SSDataBlock* p = NULL; + code = tsortGetSortedDataBlock(pHandle, &p); if (p == NULL || (code != 0)) { return code; @@ -266,18 +271,14 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, code = blockDataEnsureCapacity(p, capacity); QUERY_CHECK_CODE(code, lino, _error); - STupleHandle* pTupleHandle; while (1) { if (pInfo->pGroupIdCalc) { - pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p); + code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle); } else { code = tsortNextTuple(pHandle, &pTupleHandle); } - if (pTupleHandle == NULL || code != 0) { - lino = __LINE__; - break; - } + TSDB_CHECK_CODE(code, lino, _error); code = appendOneRowToDataBlock(p, pTupleHandle); QUERY_CHECK_CODE(code, lino, _error); @@ -320,7 +321,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, return code; _error: - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); blockDataDestroy(p); return code; @@ -330,6 +331,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); blockDataCheck(*ppBlock, false); + if (code) { + qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 19b825b0ca..ff064f1727 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); if (code != TSDB_CODE_SUCCESS) { - return terrno = code; + return code; } if (pHandle->pDataBlock->info.rows >= capacity) { @@ -2867,6 +2867,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle pHandle->tupleHandle.pBlock = NULL; return code; } + pHandle->tupleHandle.pBlock = pBlock; pHandle->tupleHandle.rowIndex = 0; } @@ -2882,8 +2883,7 @@ int32_t tsortOpen(SSortHandle* pHandle) { } if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - code = TSDB_CODE_INVALID_PARA; - return code; + return TSDB_CODE_INVALID_PARA; } pHandle->opened = true; From c1333a920d0ed03084a23ae0b2914c34cf764112 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Sep 2024 18:58:49 +0800 Subject: [PATCH 02/11] fix(query): check for null. --- source/libs/executor/src/sortoperator.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 5cac3f5f86..95c06865f0 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -279,6 +279,9 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, } TSDB_CHECK_CODE(code, lino, _error); + if (pTupleHandle == NULL) { + break; + } code = appendOneRowToDataBlock(p, pTupleHandle); QUERY_CHECK_CODE(code, lino, _error); From 922b0681eaff1d37b3798f37311def048624a2d7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Sep 2024 19:38:49 +0800 Subject: [PATCH 03/11] fix(query): check for null ptr. --- source/libs/executor/src/timewindowoperator.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6ac24ad313..ca9a04c3fd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1246,11 +1246,17 @@ void destroyIntervalOperatorInfo(void* param) { if (param == NULL) { return; } + SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); - pInfo->pOperator = NULL; + + if (pInfo->pOperator) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); @@ -1258,6 +1264,7 @@ void destroyIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; + taosArrayDestroyEx(pInfo->pPrevValues, freeItem); pInfo->pPrevValues = NULL; @@ -1351,6 +1358,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = terrno; + lino = __LINE__; goto _error; } @@ -1458,8 +1466,10 @@ _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; + qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code)); return code; } From 3c7f718210e97742e62a99aefa18f57b818c39ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Sep 2024 09:37:23 +0800 Subject: [PATCH 04/11] fix(query): check for null ptr. --- source/libs/executor/src/aggregateoperator.c | 9 +++++--- source/libs/executor/src/groupoperator.c | 10 ++++++--- .../executor/src/streamtimewindowoperator.c | 10 ++++++--- source/libs/executor/src/timewindowoperator.c | 22 +++++++++++++------ 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 863ce01256..4c1c2e0d89 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -155,9 +155,12 @@ void destroyAggOperatorInfo(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); - pInfo->pOperator = NULL; + if (pInfo->pOperator != NULL) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarExprSup); cleanupGroupResInfo(&pInfo->groupResInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3ce20dbbd9..34300a068e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -86,11 +86,15 @@ static void destroyGroupOperatorInfo(void* param) { taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey); cleanupExprSupp(&pInfo->scalarSup); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + + if (pInfo->pOperator != NULL) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupGroupResInfo(&pInfo->groupResInfo); cleanupAggSup(&pInfo->aggSup); - pInfo->pOperator = NULL; taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 22e462abab..ccc14d7c78 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -473,9 +473,13 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { } SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); - pInfo->pOperator = NULL; + + if (pInfo->pOperator != NULL) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupAggSup(&pInfo->aggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ca9a04c3fd..702fa75d93 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1226,9 +1226,13 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); - pInfo->pOperator = NULL; + + if (pInfo->pOperator != NULL) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupExprSupp(&pInfo->scalarSup); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); @@ -1251,7 +1255,7 @@ void destroyIntervalOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); - if (pInfo->pOperator) { + if (pInfo->pOperator != NULL) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); pInfo->pOperator = NULL; @@ -1757,9 +1761,13 @@ void destroySWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); - cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, - &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); - pInfo->pOperator = NULL; + + if (pInfo->pOperator != NULL) { + cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, + &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); + pInfo->pOperator = NULL; + } + cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); From 4c98786352c0d09e37975855b19ea619a1d9afee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 16:12:46 +0800 Subject: [PATCH 05/11] fix(stream): use meta id instead of ptr. --- include/libs/stream/tstream.h | 8 +-- source/dnode/vnode/src/tq/tqStreamTask.c | 65 ++++++++++++------------ source/libs/stream/inc/streamInt.h | 1 - 3 files changed, 35 insertions(+), 39 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cb10aeb6a0..e6d750468e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo; #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 +extern int32_t streamMetaId; + enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -135,11 +137,6 @@ enum { STREAM_QUEUE__PROCESSING, }; -enum { - STREAM_META_WILL_STOP = 1, - STREAM_META_OK_TO_STOP = 2, -}; - typedef enum EStreamTaskEvent { TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT_SCANHIST = 0x2, @@ -282,7 +279,6 @@ typedef enum { } EConsenChkptStatus; typedef struct SConsenChkptInfo { -// bool alreadySendChkptId; EConsenChkptStatus status; int64_t statusTs; int32_t consenChkptTransId; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b0bf89029e..3c0ff751da 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,8 +16,13 @@ #include "tq.h" #include "vnd.h" -#define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 100 +#define MAX_REPEAT_SCAN_THRESHOLD 3 +#define SCAN_WAL_IDLE_DURATION 100 + +typedef struct SBuildScanWalMsgParam { + int64_t metaId; + int32_t numOfTasks; +} SBuildScanWalMsgParam; static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); @@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); + int32_t numOfTasks = 0; + bool shouldIdle = true; tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); // check all tasks - int32_t numOfTasks = 0; - bool shouldIdle = true; - int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); if (code) { tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); @@ -68,16 +72,19 @@ int32_t tqScanWal(STQ* pTq) { return code; } -typedef struct SBuildScanWalMsgParam { - STQ* pTq; - int32_t numOfTasks; -} SBuildScanWalMsgParam; - static void doStartScanWal(void* param, void* tmrId) { SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; - STQ* pTq = pParam->pTq; - int32_t vgId = pTq->pStreamMeta->vgId; + SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId); + if (pMeta == NULL) { + tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); + taosMemoryFree(pParam); + return; + } + + int32_t vgId = pMeta->vgId; + STQ* pTq = pMeta->ahandle; + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, pTq->pVnode->restored); @@ -90,42 +97,36 @@ static void doStartScanWal(void* param, void* tmrId) { } int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t code = 0; - int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + tmr_h pTimer = NULL; + SBuildScanWalMsgParam* pParam = NULL; - SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); + pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); if (pParam == NULL) { return terrno; } - pParam->pTq = pTq; + pParam->metaId = pMeta->rid; pParam->numOfTasks = numOfTasks; - tmr_h pTimer = NULL; code = streamTimerGetInstance(&pTimer); if (code) { tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); + taosMemoryFree(pParam); return code; } - if (pMeta->scanInfo.scanTimer == NULL) { - pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer); - } else { - bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer); - if (!ret) { -// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration); - } - } - + streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); return code; } int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - - bool alreadyRestored = pTq->pVnode->restored; + bool alreadyRestored = pTq->pVnode->restored; + int32_t numOfTasks = 0; // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { @@ -134,7 +135,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { streamMetaWLock(pMeta); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); streamMetaWUnLock(pMeta); @@ -378,13 +379,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pTaskId = taosArrayGet(pTaskList, i); + STaskId* pTaskId = taosArrayGet(pTaskList, i); if (pTaskId == NULL) { continue; } SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL || code != 0) { continue; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a5c5c1b775..94c196d280 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -164,7 +164,6 @@ extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; -extern int32_t streamMetaId; int32_t streamTimerInit(); void streamTimerCleanUp(); From e1719f8de4c9f85c0322375ff6847c45e2a09201 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 18:23:07 +0800 Subject: [PATCH 06/11] fix(stream): release ref. --- source/dnode/vnode/src/tq/tqStreamTask.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3c0ff751da..c40ea66487 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -89,11 +89,13 @@ static void doStartScanWal(void* param, void* tmrId) { pTq->pVnode->restored); int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - taosMemoryFree(pParam); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } + + taosReleaseRef(streamMetaId, pParam->metaId); + taosMemoryFree(pParam); } int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { From 2f65886d0118ce2991948a47731b091802129914 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 18:26:57 +0800 Subject: [PATCH 07/11] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqStreamTask.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index c40ea66487..10cd53dd30 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -117,10 +117,10 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { if (code) { tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); taosMemoryFree(pParam); - return code; + } else { + streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); } - streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); return code; } From a23e6c2ce9407781e40820b1b321b214dc70c9d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 18:43:26 +0800 Subject: [PATCH 08/11] fix(stream): handle return value. --- source/dnode/vnode/src/tq/tqStreamTask.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 10cd53dd30..62cafbbc2e 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -73,6 +73,10 @@ int32_t tqScanWal(STQ* pTq) { } static void doStartScanWal(void* param, void* tmrId) { + int32_t vgId = 0; + STQ* pTq = NULL; + int32_t code = 0; + SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId); @@ -82,19 +86,22 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - int32_t vgId = pMeta->vgId; - STQ* pTq = pMeta->ahandle; + vgId = pMeta->vgId; + pTq = pMeta->ahandle; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, pTq->pVnode->restored); - int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - + code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } - taosReleaseRef(streamMetaId, pParam->metaId); + code = taosReleaseRef(streamMetaId, pParam->metaId); + if (code) { + tqError("vgId:% failed to release ref for streamMeta, rid:%" PRId64, vgId, pParam->metaId, tstrerror(code)); + } + taosMemoryFree(pParam); } From 52be89c022c947c7298711c97d445ae8982b539d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 19:18:49 +0800 Subject: [PATCH 09/11] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 62cafbbc2e..2afa9f5eed 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -99,7 +99,7 @@ static void doStartScanWal(void* param, void* tmrId) { code = taosReleaseRef(streamMetaId, pParam->metaId); if (code) { - tqError("vgId:% failed to release ref for streamMeta, rid:%" PRId64, vgId, pParam->metaId, tstrerror(code)); + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64, vgId, pParam->metaId, tstrerror(code)); } taosMemoryFree(pParam); From 1e96ea4b5404f1025a6cad7f18d76ae2e4e2b914 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 21:41:12 +0800 Subject: [PATCH 10/11] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2afa9f5eed..3ec269ec22 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -99,7 +99,8 @@ static void doStartScanWal(void* param, void* tmrId) { code = taosReleaseRef(streamMetaId, pParam->metaId); if (code) { - tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64, vgId, pParam->metaId, tstrerror(code)); + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); } taosMemoryFree(pParam); From 4173144ded46ab242b80c6d19847577b5eb871ec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 Sep 2024 22:59:18 +0800 Subject: [PATCH 11/11] fix(stream): return value. --- source/libs/executor/src/sortoperator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index c495e59d94..1c241dffec 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -213,8 +213,10 @@ static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pIn STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple; if (!retTuple) { code = tsortNextTuple(pHandle, &retTuple); - qError("failed to get next tuple, code:%s", tstrerror(code)); - return code; + if (code) { + qError("failed to get next tuple, code:%s", tstrerror(code)); + return code; + } } if (retTuple) {