Merge pull request #24262 from taosdata/fix/TD-28099

reset unclose window info
This commit is contained in:
Haojun Liao 2023-12-29 09:04:31 +08:00 committed by GitHub
commit a662f6d63f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 37 additions and 30 deletions

View File

@ -845,6 +845,7 @@ void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap); void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
int32_t encodeSSessionKey(void** buf, SSessionKey* key); int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -2244,8 +2244,11 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
} }
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); qDebug("%s===stream===%s: Block is Null", taskIdStr, flag);
return;
} else if (pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Empty. type:%d", taskIdStr, flag, pBlock->info.type);
return; return;
} }
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {

View File

@ -1997,7 +1997,6 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
int32_t len = streamScanOperatorEncode(pInfo, &pBuf); int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
pInfo->stateStore.streamStateCommit(pInfo->pState);
} }
// other properties are recovered from the execution plan // other properties are recovered from the execution plan

View File

@ -480,18 +480,18 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
return resBlock; return resBlock;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -561,7 +561,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (resBlock != NULL) { if (resBlock != NULL) {
return resBlock; return resBlock;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }

View File

@ -1223,7 +1223,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
if (!IS_FINAL_INTERVAL_OP(pOperator)) { if (!IS_FINAL_INTERVAL_OP(pOperator)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
@ -2610,18 +2610,18 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return opRes; return opRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -2719,7 +2719,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return opRes; return opRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3002,7 +3002,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer // semi session operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
pInfo->clearState = false; pInfo->clearState = false;
return NULL; return NULL;
} }
@ -3075,7 +3075,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer // semi session operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3552,18 +3552,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return resBlock; return resBlock;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3629,7 +3629,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (resBlock != NULL) { if (resBlock != NULL) {
return resBlock; return resBlock;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3865,11 +3865,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
// printDataBlock(pInfo->pCheckpointRes, "single interval ck"); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3901,7 +3901,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===%s recv|block type STREAM_GET_ALL", getStreamOpName(pOperator->operatorType));
pInfo->recvGetAll = true; pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
@ -4084,3 +4083,8 @@ _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
}