diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 38890f8c34..07507db836 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -503,6 +503,8 @@ typedef struct SStreamSessionAggOperatorInfo { SArray* pUpdated; SSHashObj* pStUpdated; int64_t dataVersion; + SArray* historyWins; + bool isHistoryOp; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -522,6 +524,8 @@ typedef struct SStreamStateAggOperatorInfo { SArray* pUpdated; SSHashObj* pSeUpdated; int64_t dataVersion; + bool isHistoryOp; + SArray* historyWins; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 1d2685b8c6..4042dc45c7 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -35,6 +35,7 @@ typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); +typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr); typedef struct SOperatorFpSet { __optr_open_fn_t _openFn; // DO NOT invoke this function directly @@ -45,6 +46,8 @@ typedef struct SOperatorFpSet { __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; __optr_explain_fn_t getExplainFn; + __optr_state_fn_t releaseStreamStateFn; + __optr_state_fn_t reloadStreamStateFn; } SOperatorFpSet; enum { @@ -143,6 +146,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); +void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn); int32_t optrDummyOpenFn(SOperatorInfo* pOperator); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void setOperatorCompleted(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 730252c7ee..7486a4ad5f 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -38,11 +38,18 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, .closeFn = closeFn, .reqBufFn = reqBufFn, .getExplainFn = explain, + .releaseStreamStateFn = NULL, + .reloadStreamStateFn = NULL, }; return fpSet; } +void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) { + pOperator->fpSet.releaseStreamStateFn = relaseFn; + pOperator->fpSet.reloadStreamStateFn = reloadFn; +} + int32_t optrDummyOpenFn(SOperatorInfo* pOperator) { OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1f2e4cbe39..817b2a8d5f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -44,6 +44,7 @@ int32_t scanDebug = 0; #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define STREAM_SCAN_OP_NAME "StreamScanOperator" +#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -2317,6 +2318,47 @@ static void destroyStreamScanOperatorInfo(void* param) { taosMemoryFree(pStreamScan); } +void streamScanReleaseState(SOperatorInfo* pOperator) { + SStreamScanInfo* pInfo = pOperator->info; + if (!pInfo->pUpdateInfo) { + return; + } + int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); + void* pBuff = taosMemoryCalloc(1, len); + pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len); +} + +void streamScanReloadState(SOperatorInfo* pOperator) { + SStreamScanInfo* pInfo = pOperator->info; + void* pBuff = NULL; + int32_t len = 0; + pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), &pBuff, &len); + SUpdateInfo* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); + int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo); + if (code == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) { + if (pInfo->pUpdateInfo->minTS < 0) { + pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); + pInfo->pUpdateInfo = pUpInfo; + } else { + pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS); + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); + SHashObj* curMap = pInfo->pUpdateInfo->pMap; + void *pIte = taosHashIterate(curMap, NULL); + while (pIte != NULL) { + size_t keySize = 0; + int64_t* pUid = taosHashGetKey(pIte, &keySize); + taosHashPut(pUpInfo->pMap, pUid, sizeof(int64_t), pIte, sizeof(TSKEY)); + pIte = taosHashIterate(curMap, pIte); + } + taosHashCleanup(curMap); + pInfo->pUpdateInfo->pMap = pUpInfo->pMap; + pUpInfo->pMap = NULL; + pInfo->stateStore.updateInfoDestroy(pUpInfo); + } + } +} + SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SArray* pColIds = NULL; @@ -2489,6 +2531,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7463475a54..593d735039 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -28,6 +28,8 @@ #define IS_FINAL_OP(op) ((op)->isFinal) #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" +#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" typedef struct SStateWindowInfo { SResultWindowInfo winInfo; @@ -2721,6 +2723,26 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { return size; } +void streamIntervalReleaseState(SOperatorInfo* pOperator) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { + return; + } + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + pAPI->stateStore.streamStateCommit(pInfo->pState); + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +void streamIntervalReloadState(SOperatorInfo* pOperator) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2830,6 +2852,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { initIntervalDownStream(downstream, pPhyNode->type, pInfo); } @@ -3463,6 +3486,26 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pBlock->info.id.groupId = 0; buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } +void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { + int32_t size = taosArrayGetSize(pAllWins); + if (size == 0) { + return; + } + + SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1); + taosArrayPush(pMaxWins, pSeKey); + if (pSeKey->groupId == 0) { + return; + } + uint64_t preGpId = pSeKey->groupId; + for (int32_t i = size - 2; i >= 0; i--) { + pSeKey = taosArrayGet(pAllWins, i); + if (preGpId != pSeKey->groupId) { + taosArrayPush(pMaxWins, pSeKey); + preGpId = pSeKey->groupId; + } + } +} static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; @@ -3563,6 +3606,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; + if(pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -3589,6 +3635,42 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return NULL; } +void streamSessionReleaseState(SOperatorInfo* pOperator) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + } + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +void streamSessionReloadState(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SResultWindowInfo winInfo = {0}; + SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + ASSERT(size == num * sizeof(SSessionKey)); + for (int32_t i = 0; i < num; i++) { + SResultWindowInfo winInfo = {0}; + setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -3653,11 +3735,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pUpdated = NULL; pInfo->pStUpdated = NULL; pInfo->dataVersion = 0; + pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); + if (!pInfo->historyWins) { + goto _error; + } + pInfo->isHistoryOp = false; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); @@ -3765,7 +3853,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; - initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -3815,7 +3902,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); } - + setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->operatorType = pPhyNode->type; @@ -3879,6 +3966,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { } bool compareStateKey(void* data, void* key) { + if (!data || !key) { + return true; + } SStateKeys* stateKey = (SStateKeys*)key; stateKey->pData = (char*)key + sizeof(SStateKeys); return compareVal(data, stateKey); @@ -3902,7 +3992,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS) { pCurWin->winInfo.isOutput = true; - } else { + } else if (pKeyData) { if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) { varDataCopy(pCurWin->pStateKey->pData, pKeyData); } else { @@ -4100,6 +4190,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pSeUpdated); pInfo->pSeUpdated = NULL; + if(pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -4125,6 +4219,68 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return NULL; } +void streamStateReleaseState(SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, + SSHashObj* pStUpdated, SSHashObj* pStDeleted) { + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + SResultRow* pWinResult = NULL; + initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true); + compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); + tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); + if (pNextWin->isOutput && pStDeleted) { + saveDeleteRes(pStDeleted, pNextWin->sessionWin); + } + removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); + doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); + taosMemoryFree(pNextWin->pOutputBuf); + saveSessionOutputBuf(pAggSup, pCurWin); +} + +void streamStateReloadState(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, + strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + ASSERT(size == num * sizeof(SSessionKey)); + for (int32_t i = 0; i < num; i++) { + SStateWindowInfo curInfo = {0}; + SStateWindowInfo nextInfo = {0}; + setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); + if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { + compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + } + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode; @@ -4186,11 +4342,17 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pUpdated = NULL; pInfo->pSeUpdated = NULL; pInfo->dataVersion = 0; + pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); + if (!pInfo->historyWins) { + goto _error; + } + pInfo->isHistoryOp = false; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamSessionReloadState); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5017,6 +5179,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false;