reset update info

This commit is contained in:
54liuyao 2023-12-18 14:59:48 +08:00
parent bf151eafdc
commit 70a8281ef2
1 changed files with 14 additions and 1 deletions

View File

@ -436,6 +436,18 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
return true;
}
void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo* pInfo) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
reloadFromDownStream(downstream->pDownstream[0], pInfo);
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
@ -1401,8 +1413,8 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
}
void streamIntervalReloadState(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
@ -1418,6 +1430,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
reloadFromDownStream(downstream, pInfo);
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,