diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index faaea12b1a..5fd9a4a8c1 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -55,6 +55,8 @@ typedef struct { void* pStateBackend; struct SStorageAPI api; + + int8_t fillHistory; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ad57bdbdc6..7146ba3f5b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -84,7 +84,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { } int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList); - SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; + SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; initStreamStateAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fc3b9a0be4..891fb8dd20 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -791,7 +791,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState}; + SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); @@ -815,7 +815,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); - SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState}; + SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory}; initStorageAPI(&handle.api); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 4042dc45c7..e6c3405d7f 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -129,13 +129,13 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index edda6ee03f..472e7fed0a 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -977,9 +977,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa int32_t size = 0; void* pVal = NULL; int32_t code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size); - ASSERT(code == 0); + // ASSERT(code == 0); if (code == -1) { - // coverity scan + // for history + qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, pKey->win.skey, + pKey->win.ekey, pKey->groupId); pGroupResInfo->index += 1; continue; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 7486a4ad5f..2db5ea2f1e 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -492,13 +492,13 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { - pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); + pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { int32_t children = 0; - pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) { int32_t children = pHandle->numOfVgroups; - pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) { @@ -507,7 +507,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { - pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); + pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a8a0184e4e..aac0660328 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3506,6 +3506,9 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* // clear the existed group id pBlock->info.id.groupId = 0; buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); + if (pBlock->info.rows == 0) { + cleanupGroupResInfo(pGroupResInfo); + } } void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { int32_t size = taosArrayGetSize(pAllWins); @@ -3605,7 +3608,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // if chIndex + 1 - size > 0, add new child for (int32_t i = 0; i < chIndex + 1 - size; i++) { SOperatorInfo* pChildOp = - createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); + createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0, NULL); if (!pChildOp) { T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -3693,7 +3696,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; int32_t code = TSDB_CODE_OUT_OF_MEMORY; @@ -3760,7 +3763,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (!pInfo->historyWins) { goto _error; } - pInfo->isHistoryOp = false; + if (pHandle) { + pInfo->isHistoryOp = pHandle->fillHistory; + } setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -3904,9 +3909,9 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { int32_t code = TSDB_CODE_OUT_OF_MEMORY; - SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo); + SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle); if (pOperator == NULL) { goto _error; } @@ -3930,7 +3935,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); for (int32_t i = 0; i < numOfChild; i++) { - SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); + SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, NULL); if (pChildOp == NULL) { goto _error; } @@ -4305,7 +4310,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; @@ -4369,7 +4374,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys if (!pInfo->historyWins) { goto _error; } - pInfo->isHistoryOp = false; + if (pHandle) { + pInfo->isHistoryOp = pHandle->fillHistory; + } setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 216e3b8a11..9873e7b4c8 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -762,6 +762,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB + qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); return streamStateSessionDel_rocksdb(pState, key); #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number};