From 852dd833d05719db736f8ebf0c959e243306fccf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 Jul 2024 00:09:28 +0800 Subject: [PATCH] fix(stream): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- source/libs/executor/src/executorInt.c | 2 +- source/libs/executor/src/filloperator.c | 6 ++++-- source/libs/executor/src/hashjoinoperator.c | 3 ++- source/libs/executor/src/mergejoin.c | 14 +++++++++----- source/libs/executor/src/mergeoperator.c | 10 +++++++--- source/libs/executor/src/scanoperator.c | 12 ++++++++---- source/libs/executor/src/sortoperator.c | 6 ++++-- source/libs/executor/src/timesliceoperator.c | 3 ++- source/libs/executor/src/tsort.c | 3 ++- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamMeta.c | 7 +++++-- 12 files changed, 47 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2474666e93..23e396b6a4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4648,7 +4648,8 @@ void tsdbReaderClose2(STsdbReader* pReader) { } if (pReader->resBlockInfo.freeBlock) { - pReader->resBlockInfo.pResBlock = blockDataDestroy(pReader->resBlockInfo.pResBlock); + blockDataDestroy(pReader->resBlockInfo.pResBlock); + pReader->resBlockInfo.pResBlock = NULL; } taosMemoryFree(pSupInfo->colId); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index fad2b263b2..0476a7981c 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -970,7 +970,7 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } +void cleanupBasicInfo(SOptrBasicInfo* pInfo) { blockDataDestroy(pInfo->pRes); pInfo->pRes = NULL;} bool groupbyTbname(SNodeList* pGroupList) { bool bytbname = false; diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index e1aa75d828..ad1c6d67d4 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -310,8 +310,10 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { void destroyFillOperatorInfo(void* param) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = blockDataDestroy(pInfo->pRes); - pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes); + blockDataDestroy(pInfo->pRes); + pInfo->pRes = NULL; + blockDataDestroy(pInfo->pFinalRes); + pInfo->pFinalRes = NULL; cleanupExprSupp(&pInfo->noFillExprSupp); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 2fe2ccc56f..adc1055a6b 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1065,7 +1065,8 @@ static void destroyHashJoinOperator(void* param) { hJoinFreeTableInfo(&pJoinOperator->tbs[0]); hJoinFreeTableInfo(&pJoinOperator->tbs[1]); - pJoinOperator->finBlk = blockDataDestroy(pJoinOperator->finBlk); + blockDataDestroy(pJoinOperator->finBlk); + pJoinOperator->finBlk = NULL; taosMemoryFreeClear(pJoinOperator->pResColMap); taosArrayDestroyEx(pJoinOperator->pRowBufs, hJoinFreeBufPage); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 5f0a2eadfb..50ce604a7c 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -3304,9 +3304,11 @@ void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) { SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; mWinJoinResetWindowCache(pCtx, &pCtx->cache); - - pCtx->finBlk = blockDataDestroy(pCtx->finBlk); - pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk); + + blockDataDestroy(pCtx->finBlk); + pCtx->finBlk = NULL; + blockDataDestroy(pCtx->cache.outBlk); + pCtx->cache.outBlk = NULL; taosArrayDestroy(pCtx->cache.grps); } @@ -3378,9 +3380,11 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) { SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx; + blockDataDestroy(pCtx->finBlk); + blockDataDestroy(pCtx->midBlk); - pCtx->finBlk = blockDataDestroy(pCtx->finBlk); - pCtx->midBlk = blockDataDestroy(pCtx->midBlk); + pCtx->finBlk = NULL; + pCtx->midBlk = NULL; } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 993e8c72fd..2816bae03c 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -232,8 +232,11 @@ int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u void destroySortMergeOperatorInfo(void* param) { SSortMergeInfo* pSortMergeInfo = param; - pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock); - pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock); + blockDataDestroy(pSortMergeInfo->pInputBlock); + pSortMergeInfo->pInputBlock = NULL; + + blockDataDestroy(pSortMergeInfo->pIntermediateBlock); + pSortMergeInfo->pIntermediateBlock = NULL; taosArrayDestroy(pSortMergeInfo->matchInfo.pList); @@ -429,7 +432,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { void destroyMultiwayMergeOperatorInfo(void* param) { SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + blockDataDestroy(pInfo->binfo.pRes); + pInfo->binfo.pRes = NULL; if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) { (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 678d8d569e..b6f24bf367 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3785,7 +3785,8 @@ static void destroyTagScanOperatorInfo(void* param) { taosArrayDestroy(pInfo->aFilterIdxs); taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag); - pInfo->pRes = blockDataDestroy(pInfo->pRes); + blockDataDestroy(pInfo->pRes); + pInfo->pRes = NULL; taosArrayDestroy(pInfo->matchInfo.pList); pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); taosMemoryFreeClear(param); @@ -4738,15 +4739,18 @@ void destroyTableMergeScanOperatorInfo(void* param) { pTableScanInfo->pSortHandle = NULL; taosHashCleanup(pTableScanInfo->mSkipTables); pTableScanInfo->mSkipTables = NULL; - pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + blockDataDestroy(pTableScanInfo->pSortInputBlock); + pTableScanInfo->pSortInputBlock = NULL; // end one reader variable cleanupQueryTableDataCond(&pTableScanInfo->base.cond); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); - pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); + return;blockDataDestroy(pTableScanInfo->pResBlock); + pTableScanInfo->pResBlock = NULL; // remove it from the task->result list - pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock); + blockDataDestroy(pTableScanInfo->pReaderBlock); + pTableScanInfo->pReaderBlock = NULL; taosArrayDestroy(pTableScanInfo->pSortInfo); stopSubTablesTableMergeScan(pTableScanInfo); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 507dbe7ee2..82eebf5310 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -368,7 +368,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { void destroySortOperatorInfo(void* param) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + blockDataDestroy(pInfo->binfo.pRes); + pInfo->binfo.pRes = NULL; tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); @@ -611,7 +612,8 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u void destroyGroupSortOperatorInfo(void* param) { SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + blockDataDestroy(pInfo->binfo.pRes); + pInfo->binfo.pRes = NULL; taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->matchInfo.pList); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index cda22fa320..cdcc702629 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1087,7 +1087,8 @@ _error: void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; - pInfo->pRes = blockDataDestroy(pInfo->pRes); + blockDataDestroy(pInfo->pRes); + pInfo->pRes = NULL; for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index d9bcc954a4..6d88eaef99 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -555,7 +555,8 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT (*numOfCompleted) += 1; pSource->src.rowIndex = -1; pSource->pageIndex = -1; - pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); + blockDataDestroy(pSource->src.pBlock); + pSource->src.pBlock = NULL; } else { if (pSource->pageIndex % 512 == 0) { qDebug("begin source %p page %d", pSource, pSource->pageIndex); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d59bb0dd91..4e128ace54 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1491,7 +1491,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S { // do send response with the input status - code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 806eb0ce91..75dc327104 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -57,10 +57,13 @@ static void streamMetaEnvInit() { streamMetaId = taosOpenRef(64, streamMetaCloseImpl); metaRefMgtInit(); - streamTimerInit(); + int32_t code = streamTimerInit(); + if (code != 0) { + stError("failed to init stream meta env, start failed"); + } } -void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { taosCloseRef(streamBackendId);