fix(stream): fix syntax error.

This commit is contained in:
Haojun Liao 2024-07-19 00:09:28 +08:00
parent 2ca6726922
commit 852dd833d0
12 changed files with 47 additions and 24 deletions

View File

@ -4648,7 +4648,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
}
if (pReader->resBlockInfo.freeBlock) {
pReader->resBlockInfo.pResBlock = blockDataDestroy(pReader->resBlockInfo.pResBlock);
blockDataDestroy(pReader->resBlockInfo.pResBlock);
pReader->resBlockInfo.pResBlock = NULL;
}
taosMemoryFree(pSupInfo->colId);

View File

@ -970,7 +970,7 @@ void cleanupExprSupp(SExprSupp* pSupp) {
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { blockDataDestroy(pInfo->pRes); pInfo->pRes = NULL;}
bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;

View File

@ -310,8 +310,10 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
void destroyFillOperatorInfo(void* param) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);
blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL;
blockDataDestroy(pInfo->pFinalRes);
pInfo->pFinalRes = NULL;
cleanupExprSupp(&pInfo->noFillExprSupp);

View File

@ -1065,7 +1065,8 @@ static void destroyHashJoinOperator(void* param) {
hJoinFreeTableInfo(&pJoinOperator->tbs[0]);
hJoinFreeTableInfo(&pJoinOperator->tbs[1]);
pJoinOperator->finBlk = blockDataDestroy(pJoinOperator->finBlk);
blockDataDestroy(pJoinOperator->finBlk);
pJoinOperator->finBlk = NULL;
taosMemoryFreeClear(pJoinOperator->pResColMap);
taosArrayDestroyEx(pJoinOperator->pRowBufs, hJoinFreeBufPage);

View File

@ -3304,9 +3304,11 @@ void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) {
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
mWinJoinResetWindowCache(pCtx, &pCtx->cache);
pCtx->finBlk = blockDataDestroy(pCtx->finBlk);
pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk);
blockDataDestroy(pCtx->finBlk);
pCtx->finBlk = NULL;
blockDataDestroy(pCtx->cache.outBlk);
pCtx->cache.outBlk = NULL;
taosArrayDestroy(pCtx->cache.grps);
}
@ -3378,9 +3380,11 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
blockDataDestroy(pCtx->finBlk);
blockDataDestroy(pCtx->midBlk);
pCtx->finBlk = blockDataDestroy(pCtx->finBlk);
pCtx->midBlk = blockDataDestroy(pCtx->midBlk);
pCtx->finBlk = NULL;
pCtx->midBlk = NULL;
}

View File

@ -232,8 +232,11 @@ int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u
void destroySortMergeOperatorInfo(void* param) {
SSortMergeInfo* pSortMergeInfo = param;
pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock);
pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
blockDataDestroy(pSortMergeInfo->pInputBlock);
pSortMergeInfo->pInputBlock = NULL;
blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
pSortMergeInfo->pIntermediateBlock = NULL;
taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
@ -429,7 +432,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
void destroyMultiwayMergeOperatorInfo(void* param) {
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
blockDataDestroy(pInfo->binfo.pRes);
pInfo->binfo.pRes = NULL;
if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
(*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);

View File

@ -3785,7 +3785,8 @@ static void destroyTagScanOperatorInfo(void* param) {
taosArrayDestroy(pInfo->aFilterIdxs);
taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL;
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
taosMemoryFreeClear(param);
@ -4738,15 +4739,18 @@ void destroyTableMergeScanOperatorInfo(void* param) {
pTableScanInfo->pSortHandle = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL;
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pSortInputBlock = NULL;
// end one reader variable
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
return;blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pResBlock = NULL;
// remove it from the task->result list
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
blockDataDestroy(pTableScanInfo->pReaderBlock);
pTableScanInfo->pReaderBlock = NULL;
taosArrayDestroy(pTableScanInfo->pSortInfo);
stopSubTablesTableMergeScan(pTableScanInfo);

View File

@ -368,7 +368,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
void destroySortOperatorInfo(void* param) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
blockDataDestroy(pInfo->binfo.pRes);
pInfo->binfo.pRes = NULL;
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo);
@ -611,7 +612,8 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u
void destroyGroupSortOperatorInfo(void* param) {
SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
blockDataDestroy(pInfo->binfo.pRes);
pInfo->binfo.pRes = NULL;
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->matchInfo.pList);

View File

@ -1087,7 +1087,8 @@ _error:
void destroyTimeSliceOperatorInfo(void* param) {
STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);

View File

@ -555,7 +555,8 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
blockDataDestroy(pSource->src.pBlock);
pSource->src.pBlock = NULL;
} else {
if (pSource->pageIndex % 512 == 0) {
qDebug("begin source %p page %d", pSource, pSource->pageIndex);

View File

@ -1491,7 +1491,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
{
// do send response with the input status
code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
return code;

View File

@ -57,10 +57,13 @@ static void streamMetaEnvInit() {
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
metaRefMgtInit();
streamTimerInit();
int32_t code = streamTimerInit();
if (code != 0) {
stError("failed to init stream meta env, start failed");
}
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaInit() { (void) taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);