add stream backend copy
This commit is contained in:
parent
87c78919a9
commit
c692f8c21c
|
@ -1465,9 +1465,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
// qError("open state %p", pInfo->pState);
|
||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
// qError("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
qInfo("open state %p", pInfo->pState);
|
||||
pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
//*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
|
||||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
|
|
|
@ -1117,7 +1117,11 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
|
|||
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
|
||||
|
||||
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
|
||||
dst->pTdbState->pOwner = src->pTdbState->pOwner;
|
||||
if (dst->pFileState == NULL) {
|
||||
dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
|
||||
dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
}
|
||||
dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
|
||||
return;
|
||||
}
|
||||
SStreamStateCur* createStreamStateCursor() {
|
||||
|
|
Loading…
Reference in New Issue