From 693942b8215867465bf8dfc38cae9dac4c35cd57 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 15 Jun 2023 14:13:17 +0800 Subject: [PATCH] trans state --- include/libs/executor/executor.h | 3 +++ include/libs/executor/storageapi.h | 1 + include/libs/stream/streamState.h | 2 ++ include/libs/stream/tstream.h | 3 +++ include/libs/stream/tstreamFileState.h | 1 + source/dnode/snode/src/snodeInitApi.c | 1 + source/dnode/vnode/src/tq/tq.c | 2 ++ source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/src/executor.c | 14 +++++++++++++- source/libs/executor/src/projectoperator.c | 15 +++++++++++++++ source/libs/executor/src/timewindowoperator.c | 19 +++++++++++++++---- source/libs/stream/src/streamExec.c | 12 ++++++++++++ source/libs/stream/src/streamState.c | 4 ++++ source/libs/stream/src/tstreamFileState.c | 5 +++++ 14 files changed, 78 insertions(+), 5 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 852257f5df..e03718b571 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -229,6 +229,9 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); void qStreamCloseTsdbReader(void* task); void resetTaskInfo(qTaskInfo_t tinfo); +int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); +int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); + #ifdef __cplusplus } #endif diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 2b4eb06f68..bd31997640 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -392,6 +392,7 @@ typedef struct SStateStore { int32_t (*streamStateCommit)(SStreamState* pState); void (*streamStateDestroy)(SStreamState* pState, bool remove); int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); + void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); } SStateStore; typedef struct SStorageAPI { diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7f9d20a9dd..7747df8595 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -138,6 +138,8 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); +void streamStateReloadInfo(SStreamState* pState, TSKEY ts); + /***compare func **/ typedef struct SStateChekpoint { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3cfde016f0..09583572ed 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -620,6 +620,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp); +int32_t streamTaskReleaseState(SStreamTask* pTask); +int32_t streamTaskReloadState(SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 8496b0ea62..b2255013ca 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -49,6 +49,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); +void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index f5e9245252..c046505630 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -101,6 +101,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy= streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; + pStore->streamStateReloadInfo = streamStateReloadInfo; } void initFunctionStateStore(SFunctionStateStore* pStore) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6f05d67b5e..54afdfed4e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1111,6 +1111,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pStreamTask == NULL) { // todo handle error } + // streamTaskReleaseState(pTask); + // streamTaskReloadState(pStreamTask); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index d2db6368a2..28a88561af 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -203,6 +203,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; + pStore->streamStateReloadInfo = streamStateReloadInfo; } void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f4135f58d1..9542a10389 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1326,4 +1326,16 @@ SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { SOperatorInfo* pOperator = pTaskInfo->pRoot; extractTableList(pArray, pOperator); return pArray; -} \ No newline at end of file +} + +int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo; + pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot); + return 0; +} + +int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo; + pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot); + return 0; +} diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index e7de826d4b..c3459ace23 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -73,6 +73,20 @@ static void destroyIndefinitOperatorInfo(void* param) { taosMemoryFreeClear(param); } +void streamOperatorReleaseState(SOperatorInfo* pOperator) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +void streamOperatorReloadState(SOperatorInfo* pOperator) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -134,6 +148,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 593d735039..50b9ca1c31 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -28,6 +28,7 @@ #define IS_FINAL_OP(op) ((op)->isFinal) #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" #define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" #define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" @@ -2724,8 +2725,10 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { } void streamIntervalReleaseState(SOperatorInfo* pOperator) { - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - return; + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + int32_t resSize = sizeof(TSKEY); + pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); } SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -2737,6 +2740,15 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) { } void streamIntervalReloadState(SOperatorInfo* pOperator) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); + TSKEY ts = *(TSKEY*)pBuf; + pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts); + } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); @@ -3651,7 +3663,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SResultWindowInfo winInfo = {0}; - SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; int32_t size = 0; void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, @@ -4352,7 +4363,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL); - setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamSessionReloadState); + setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 907360dff0..96a10b9cd1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -562,3 +562,15 @@ int32_t streamTryExec(SStreamTask* pTask) { return 0; } + +int32_t streamTaskReleaseState(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + int32_t code = qStreamOperatorReleaseState(exec); + return code; +} + +int32_t streamTaskReloadState(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + int32_t code = qStreamOperatorReloadState(exec); + return code; +} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 967c7733c9..aeb25c2368 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1113,6 +1113,10 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { #endif } +void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { + streamFileStateReloadInfo(pState->pFileState, ts); +} + #if 0 char* streamStateSessionDump(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 2d730f3eda..0799671bce 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -524,3 +524,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { } int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; } + +void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { + pFileState->flushMark = TMAX(pFileState->flushMark, ts); + pFileState->maxTs = TMAX(pFileState->maxTs, ts); +}