From 51cd8e9fb77822374cff9b0583874638151d8828 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 28 Dec 2023 15:34:13 +0800 Subject: [PATCH] reset unclose window info --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/executil.c | 7 ++- source/libs/executor/src/scanoperator.c | 1 - .../executor/src/streameventwindowoperator.c | 14 +++--- .../executor/src/streamtimewindowoperator.c | 44 ++++++++++--------- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e3e504cdbc..e98777544b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -845,6 +845,7 @@ void resetWinRange(STimeWindow* winRange); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); void resetUnCloseSessionWinInfo(SSHashObj* winMap); +void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 377de99fc0..825d716926 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2244,8 +2244,11 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr } void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { - if (!pBlock || pBlock->info.rows == 0) { - qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); + if (!pBlock) { + qDebug("%s===stream===%s: Block is Null", taskIdStr, flag); + return; + } else if (pBlock->info.rows == 0) { + qDebug("%s===stream===%s: Block is Empty. type:%d", taskIdStr, flag, pBlock->info.type); return; } if (qDebugFlag & DEBUG_DEBUG) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef2a99d1d1..32407dcf71 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1997,7 +1997,6 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { int32_t len = streamScanOperatorEncode(pInfo, &pBuf); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); taosMemoryFree(pBuf); - pInfo->stateStore.streamStateCommit(pInfo->pState); } // other properties are recovered from the execution plan diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 6adab74344..3868edf645 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -480,18 +480,18 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { return resBlock; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + if (pInfo->reCkBlock) { pInfo->reCkBlock = false; printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pCheckpointRes; } - if (pInfo->recvGetAll) { - pInfo->recvGetAll = false; - resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); - } - - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -561,7 +561,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { if (resBlock != NULL) { return resBlock; } - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index c9490e2c55..a96d0ef633 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1223,7 +1223,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pCheckpointRes; } - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); if (!IS_FINAL_INTERVAL_OP(pOperator)) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer @@ -2609,18 +2609,18 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return opRes; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + if (pInfo->reCkBlock) { pInfo->reCkBlock = false; printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pCheckpointRes; } - if (pInfo->recvGetAll) { - pInfo->recvGetAll = false; - resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); - } - - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -2718,7 +2718,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return opRes; } - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -3001,7 +3001,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); pInfo->clearState = false; return NULL; } @@ -3074,7 +3074,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -3551,18 +3551,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return resBlock; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + if (pInfo->reCkBlock) { pInfo->reCkBlock = false; printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pCheckpointRes; } - if (pInfo->recvGetAll) { - pInfo->recvGetAll = false; - resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); - } - - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -3628,7 +3628,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (resBlock != NULL) { return resBlock; } - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -3864,11 +3864,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->reCkBlock) { pInfo->reCkBlock = false; - // printDataBlock(pInfo->pCheckpointRes, "single interval ck"); + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pCheckpointRes; } - setOperatorCompleted(pOperator); + setStreamOperatorCompleted(pOperator); return NULL; } @@ -3900,7 +3900,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - qDebug("===stream===%s recv|block type STREAM_GET_ALL", getStreamOpName(pOperator->operatorType)); pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; @@ -4083,3 +4082,8 @@ _error: pTaskInfo->code = code; return NULL; } + +void setStreamOperatorCompleted(SOperatorInfo* pOperator) { + setOperatorCompleted(pOperator); + qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); +}