diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 1c909cb47d..cf558be1be 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1081,10 +1081,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SWinKey key = {0}; + SWinKey key = {0}; buf = decodeSWinKey(buf, &key); SRowBuffPos* pPos = NULL; - int32_t resSize = pInfo->aggSup.resultRowSize; + int32_t resSize = pInfo->aggSup.resultRowSize; pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); } @@ -1388,10 +1388,12 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { void* pBuf = NULL; int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); - TSKEY ts = *(TSKEY*)pBuf; - taosMemoryFree(pBuf); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); - pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); + if (code == 0) { + TSKEY ts = *(TSKEY*)pBuf; + taosMemoryFree(pBuf); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); + } } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { @@ -2527,16 +2529,16 @@ void resetWinRange(STimeWindow* winRange) { void streamSessionSemiReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); SResultWindowInfo winInfo = {0}; - int32_t size = 0; - void* pBuf = NULL; - int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); - SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; @@ -2671,7 +2673,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; - // for stream + // for stream void* buff = NULL; int32_t len = 0; int32_t res = @@ -2792,7 +2794,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; - if(pInfo->isHistoryOp) { + if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -2826,8 +2828,9 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->operatorType = pPhyNode->type; if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, - destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, @@ -3738,8 +3741,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, - destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); pInfo->stateStore = pTaskInfo->storageAPI.stateStore;