From cebc6b66973693c07eeed7ca19db9b00e2fe932e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 10 Nov 2022 14:43:37 +0800 Subject: [PATCH] fix(stream): state and session agg partition tbname --- source/libs/executor/src/timewindowoperator.c | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7b6ed5b67d..46e87527f1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3843,7 +3843,7 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) { return TSDB_CODE_SUCCESS; } -void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { +void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { blockDataCleanup(pBlock); int32_t size = tSimpleHashGetSize(pStDeleted); if (size == 0) { @@ -3869,6 +3869,26 @@ void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** I colDataAppendNULL(pCalStCol, pBlock->info.rows); SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); colDataAppendNULL(pCalEdCol, pBlock->info.rows); + + SHashObj* pGroupIdTbNameMap; + if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOp->info; + pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; + } else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + SStreamStateAggOperatorInfo* pInfo = pOp->info; + pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; + } + + char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t)); + SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + if (tbname == NULL) { + colDataAppendNULL(pTableCol, pBlock->info.rows); + } else { + char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; + STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); + colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + } pBlock->info.rows += 1; } if ((*Ite) == NULL) { @@ -4013,7 +4033,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; @@ -4112,7 +4132,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; @@ -4238,7 +4258,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "semi session delete"); return pInfo->pDelRes; @@ -4312,7 +4332,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "semi session delete"); return pInfo->pDelRes; @@ -4563,7 +4583,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; int64_t maxTs = INT64_MIN; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single state delete"); return pInfo->pDelRes; @@ -4630,7 +4650,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single state delete"); return pInfo->pDelRes;