session window max delay

This commit is contained in:
liuyao 2023-11-01 09:48:53 +08:00
parent 4ba45abd01
commit e8aa1555ca
2 changed files with 27 additions and 0 deletions

View File

@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool reCkBlock; bool reCkBlock;
SSDataBlock* pCheckpointRes; SSDataBlock* pCheckpointRes;
bool clearState; bool clearState;
bool recvGetAll;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo { typedef struct SStreamStateAggOperatorInfo {
@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo {
SArray* historyWins; SArray* historyWins;
bool reCkBlock; bool reCkBlock;
SSDataBlock* pCheckpointRes; SSDataBlock* pCheckpointRes;
bool recvGetAll;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo { typedef struct SStreamPartitionOperatorInfo {

View File

@ -2532,6 +2532,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
taosMemoryFree(buf); taosMemoryFree(buf);
} }
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pResInfo = pIte;
pResInfo->pStatePos->beUsed = true;
}
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -2546,6 +2555,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (opRes) { if (opRes) {
return opRes; return opRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -2583,6 +2598,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -2838,6 +2854,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->clearState = false; pInfo->clearState = false;
pInfo->recvGetAll = false;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
// for stream // for stream
void* buff = NULL; void* buff = NULL;
@ -3454,6 +3472,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return resBlock; return resBlock;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3482,6 +3505,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -3713,6 +3737,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
} }
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
// for stream // for stream
void* buff = NULL; void* buff = NULL;