diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 6fc50bb860..be27f277c0 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -49,7 +49,7 @@ #define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint" #define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint" -#define MAX_STREAM_HISTORY_RESULT 20000000 +#define MAX_STREAM_HISTORY_RESULT 20000000 typedef struct SStateWindowInfo { SResultWindowInfo winInfo; @@ -449,7 +449,7 @@ void destroyFlusedPos(void* pRes) { } void destroyFlusedppPos(void* ppRes) { - void *pRes = *(void **)ppRes; + void* pRes = *(void**)ppRes; destroyFlusedPos(pRes); } @@ -507,7 +507,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { } taosArrayDestroy(pInfo->pMidPullDatas); - if (pInfo->pState !=NULL && pInfo->pState->dump == 1) { + if (pInfo->pState != NULL && pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); taosMemoryFreeClear(pInfo->pState->pTdbState); } @@ -548,7 +548,8 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } -int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) { +int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, + struct SSteamOpBasicInfo* pBasic) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; @@ -1028,7 +1029,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN } static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, - SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { + SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; @@ -1661,7 +1662,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); QUERY_CHECK_NULL(delWins, code, lino, _end, terrno); SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); @@ -1897,9 +1898,8 @@ _end: } } -int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, - SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1959,8 +1959,8 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); @@ -2042,11 +2042,13 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pOperator->info = pInfo; if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, - optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = + createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { - pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, - optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = + createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || @@ -2220,10 +2222,10 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); - pSup->pState->pFileState = NULL; - code = pSup->stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, - pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState); + pSup->pState->pFileState = NULL; + code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, + sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, + pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState); QUERY_CHECK_CODE(code, lino, _end); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3309,8 +3311,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); - code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, + sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -3772,8 +3774,8 @@ _end: } } -int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -3802,12 +3804,12 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } SExprSupp* pExpSup = &pOperator->exprSupp; - + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); pInfo->binfo.pRes = pResBlock; - SExprInfo* pExprInfo = NULL; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); @@ -3886,8 +3888,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode QUERY_CHECK_CODE(code, lino, _error); } } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo, - optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); if (downstream) { @@ -4102,8 +4105,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, - SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle, + SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -4111,7 +4114,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* pOperator = NULL; code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator); if (pOperator == NULL || code != 0) { - downstream = NULL; + downstream = NULL; QUERY_CHECK_CODE(code, lino, _error); } @@ -4618,8 +4621,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera QUERY_CHECK_CODE(code, lino, _end); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); - code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, + sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -4846,7 +4849,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); if (!pBuff) { - return ; + return; } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); @@ -4944,7 +4947,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); } } - taosMemoryFree(pBuf); + taosMemoryFreeClear(pBuf); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { @@ -4953,6 +4956,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); _end: + taosMemoryFreeClear(pBuf); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } @@ -5001,9 +5005,9 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); pInfo->binfo.pRes = pResBlock; - SExprSupp* pExpSup = &pOperator->exprSupp; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; + SExprSupp* pExpSup = &pOperator->exprSupp; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); @@ -5335,7 +5339,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; SExprInfo* pExprInfo = NULL; code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); @@ -5383,7 +5387,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pState->pFileState = NULL; code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, + &pInfo->pState->pFileState); QUERY_CHECK_CODE(code, lino, _error); pInfo->pOperator = pOperator; @@ -5397,7 +5402,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); - QUERY_CHECK_CODE(code, lino, _error); + QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);