diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cb10aeb6a0..e6d750468e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo; #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 +extern int32_t streamMetaId; + enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -135,11 +137,6 @@ enum { STREAM_QUEUE__PROCESSING, }; -enum { - STREAM_META_WILL_STOP = 1, - STREAM_META_OK_TO_STOP = 2, -}; - typedef enum EStreamTaskEvent { TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT_SCANHIST = 0x2, @@ -282,7 +279,6 @@ typedef enum { } EConsenChkptStatus; typedef struct SConsenChkptInfo { -// bool alreadySendChkptId; EConsenChkptStatus status; int64_t statusTs; int32_t consenChkptTransId; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b0bf89029e..3ec269ec22 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,8 +16,13 @@ #include "tq.h" #include "vnd.h" -#define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 100 +#define MAX_REPEAT_SCAN_THRESHOLD 3 +#define SCAN_WAL_IDLE_DURATION 100 + +typedef struct SBuildScanWalMsgParam { + int64_t metaId; + int32_t numOfTasks; +} SBuildScanWalMsgParam; static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); @@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); + int32_t numOfTasks = 0; + bool shouldIdle = true; tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); // check all tasks - int32_t numOfTasks = 0; - bool shouldIdle = true; - int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); if (code) { tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); @@ -68,54 +72,61 @@ int32_t tqScanWal(STQ* pTq) { return code; } -typedef struct SBuildScanWalMsgParam { - STQ* pTq; - int32_t numOfTasks; -} SBuildScanWalMsgParam; - static void doStartScanWal(void* param, void* tmrId) { + int32_t vgId = 0; + STQ* pTq = NULL; + int32_t code = 0; + SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; - STQ* pTq = pParam->pTq; - int32_t vgId = pTq->pStreamMeta->vgId; + SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId); + if (pMeta == NULL) { + tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); + taosMemoryFree(pParam); + return; + } + + vgId = pMeta->vgId; + pTq = pMeta->ahandle; + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, pTq->pVnode->restored); - int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - taosMemoryFree(pParam); - + code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } + + code = taosReleaseRef(streamMetaId, pParam->metaId); + if (code) { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + + taosMemoryFree(pParam); } int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t code = 0; - int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + tmr_h pTimer = NULL; + SBuildScanWalMsgParam* pParam = NULL; - SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); + pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); if (pParam == NULL) { return terrno; } - pParam->pTq = pTq; + pParam->metaId = pMeta->rid; pParam->numOfTasks = numOfTasks; - tmr_h pTimer = NULL; code = streamTimerGetInstance(&pTimer); if (code) { tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); - return code; - } - - if (pMeta->scanInfo.scanTimer == NULL) { - pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer); + taosMemoryFree(pParam); } else { - bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer); - if (!ret) { -// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration); - } + streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); } return code; @@ -124,8 +135,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - - bool alreadyRestored = pTq->pVnode->restored; + bool alreadyRestored = pTq->pVnode->restored; + int32_t numOfTasks = 0; // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { @@ -134,7 +145,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { streamMetaWLock(pMeta); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); streamMetaWUnLock(pMeta); @@ -378,13 +389,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pTaskId = taosArrayGet(pTaskList, i); + STaskId* pTaskId = taosArrayGet(pTaskList, i); if (pTaskId == NULL) { continue; } SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); if (pTask == NULL || code != 0) { continue; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c3228f59bf..cbf392f67e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) { int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); - int32_t lino = 0; int64_t curOwner = 0; *pRes = NULL; @@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes); if (code) { pTaskInfo->code = code; - qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo)); } blockDataCheck(*pRes, false); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 8c76b1a1c3..64a07c4653 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1301,10 +1301,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM); pOperator->pDownstreamGetParams[idx] = NULL; } + + if (code) { + qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code)); + } return code; } code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock); + if (code) { + qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1c9279b84f..9cf2a3ea17 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -86,11 +86,13 @@ static void destroyGroupOperatorInfo(void* param) { taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey); cleanupExprSupp(&pInfo->scalarSup); - if (pInfo->pOperator) { + + if (pInfo->pOperator != NULL) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); pInfo->pOperator = NULL; } + cleanupGroupResInfo(&pInfo->groupResInfo); cleanupAggSup(&pInfo->aggSup); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 45cd755f78..7fd6b91e52 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); blockDataCheck(*ppBlock, false); + if (code) { + qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e1153135d2..bae9926f63 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1380,8 +1380,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); if (code != TSDB_CODE_SUCCESS) { taosRUnLockLatch(&pTaskInfo->lock); - lino = __LINE__; - goto _end; + TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->currentTable >= numOfTables) { @@ -1393,11 +1392,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); if (!tmp) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); taosRUnLockLatch(&pTaskInfo->lock); (*ppRes) = NULL; - return terrno; + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } + tInfo = *tmp; taosRUnLockLatch(&pTaskInfo->lock); @@ -1412,11 +1411,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { } } else { // scan table group by group sequentially code = groupSeqTableScan(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); } _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } @@ -5834,9 +5834,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STupleHandle* pTupleHandle = NULL; blockDataCleanup(pResBlock); - STupleHandle* pTupleHandle = NULL; + while (1) { while (1) { pTupleHandle = NULL; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 11b3fa8c70..1c241dffec 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -204,15 +204,18 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys * @param [in, out] pBlock the output block, the group id will be saved in it * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple - * @retval NULL if no more tuples */ -static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) { - int32_t code = 0; +static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock, + STupleHandle** pTupleHandle) { + QRY_PARAM_CHECK(pTupleHandle); + + int32_t code = 0; STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple; if (!retTuple) { code = tsortNextTuple(pHandle, &retTuple); if (code) { - return NULL; + qError("failed to get next tuple, code:%s", tstrerror(code)); + return code; } } @@ -225,7 +228,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf, &pInfo->pGroupIdCalc->lastKeysLen, retTuple); } - bool emptyBlock = pBlock->info.rows == 0; + + bool emptyBlock = (pBlock->info.rows == 0); if (newGroup) { if (!emptyBlock) { // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return @@ -247,17 +251,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf } } - return retTuple; + *pTupleHandle = retTuple; + return code; } static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); blockDataCleanup(pDataBlock); - int32_t lino = 0; - int32_t code = 0; - SSDataBlock* p = NULL; + int32_t lino = 0; + int32_t code = 0; + STupleHandle* pTupleHandle = NULL; + SSDataBlock* p = NULL; + code = tsortGetSortedDataBlock(pHandle, &p); if (p == NULL || (code != 0)) { return code; @@ -266,16 +273,15 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, code = blockDataEnsureCapacity(p, capacity); QUERY_CHECK_CODE(code, lino, _error); - STupleHandle* pTupleHandle; while (1) { if (pInfo->pGroupIdCalc) { - pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p); + code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle); } else { code = tsortNextTuple(pHandle, &pTupleHandle); } - if (pTupleHandle == NULL || code != 0) { - lino = __LINE__; + TSDB_CHECK_CODE(code, lino, _error); + if (pTupleHandle == NULL) { break; } @@ -320,7 +326,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, return code; _error: - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); blockDataDestroy(p); return code; @@ -330,6 +336,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock); blockDataCheck(*ppBlock, false); + if (code) { + qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cacc4f4cee..8164281871 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1229,11 +1229,13 @@ static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); - if (pInfo->pOperator) { + + if (pInfo->pOperator != NULL) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); pInfo->pOperator = NULL; } + cleanupExprSupp(&pInfo->scalarSup); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); @@ -1251,13 +1253,17 @@ void destroyIntervalOperatorInfo(void* param) { if (param == NULL) { return; } + SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); - if (pInfo->pOperator) { + + if (pInfo->pOperator != NULL) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); pInfo->pOperator = NULL; } + cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); @@ -1265,6 +1271,7 @@ void destroyIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; + taosArrayDestroyEx(pInfo->pPrevValues, freeItem); pInfo->pPrevValues = NULL; @@ -1358,6 +1365,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = terrno; + lino = __LINE__; goto _error; } @@ -1465,8 +1473,10 @@ _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; + qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code)); return code; } @@ -1754,11 +1764,13 @@ void destroySWindowOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); - if (pInfo->pOperator) { + + if (pInfo->pOperator != NULL) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable); pInfo->pOperator = NULL; } + cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSupp); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6e5d2cadf5..17c390e239 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); if (code != TSDB_CODE_SUCCESS) { - return terrno = code; + return code; } if (pHandle->pDataBlock->info.rows >= capacity) { @@ -2869,6 +2869,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle pHandle->tupleHandle.pBlock = NULL; return code; } + pHandle->tupleHandle.pBlock = pBlock; pHandle->tupleHandle.rowIndex = 0; } @@ -2884,8 +2885,7 @@ int32_t tsortOpen(SSortHandle* pHandle) { } if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - code = TSDB_CODE_INVALID_PARA; - return code; + return TSDB_CODE_INVALID_PARA; } pHandle->opened = true; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a5c5c1b775..94c196d280 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -164,7 +164,6 @@ extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; -extern int32_t streamMetaId; int32_t streamTimerInit(); void streamTimerCleanUp();