diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8cd441798e..06e736e049 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1465,9 +1465,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - // qError("open state %p", pInfo->pState); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - // qError("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); + qInfo("open state %p", pInfo->pState); + pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState); + //*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + + qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 29f6600e16..a5e8c46f18 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1117,7 +1117,11 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { - dst->pTdbState->pOwner = src->pTdbState->pOwner; + if (dst->pFileState == NULL) { + dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); + dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask)); + } + dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; return; } SStreamStateCur* createStreamStateCursor() {