From 8357c7342b3d6601bfdb636446171178ec35c254 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 19 Oct 2022 11:05:00 +0800 Subject: [PATCH] fix(stream): stream state and session support partition tbname --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 30 +++++++++++++++++-- source/libs/executor/src/tfill.c | 2 ++ source/libs/executor/src/timewindowoperator.c | 14 ++++++++- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 47450328be..1aa3d55efc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); -int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 641bcb0c5b..0e74b56882 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4312,8 +4312,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in return TSDB_CODE_SUCCESS; } -int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock, +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -4338,6 +4339,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta if (pBlock->info.groupId == 0) { pBlock->info.groupId = pKey->groupId; + + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } else { + ASSERT(0); + } + } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pKey->groupId) { @@ -4383,4 +4409,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta } blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e199cf8406..599e46c5fc 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { case STREAM_NORMAL: case STREAM_INVALID: { doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); + memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = 0; } break; default: @@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { doStreamFillImpl(pOperator); doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); + memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 76fddc3982..add5f44847 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup // clear the existed group id pBlock->info.groupId = 0; - buildSessionResultDataBlock(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); + buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo); } static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { @@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ } + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); @@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single state recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));