Merge pull request #24110 from taosdata/fix/TD-27903

reset update info
This commit is contained in:
Haojun Liao 2023-12-18 16:03:22 +08:00 committed by GitHub
commit dad26d645a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 1 deletions

View File

@ -436,6 +436,18 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
return true; return true;
} }
void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo* pInfo) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
reloadFromDownStream(downstream->pDownstream[0], pInfo);
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
@ -1401,8 +1413,8 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
} }
void streamIntervalReloadState(SOperatorInfo* pOperator) { void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
int32_t size = 0; int32_t size = 0;
void* pBuf = NULL; void* pBuf = NULL;
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
@ -1418,6 +1430,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadFromDownStream(downstream, pInfo);
} }
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,