From c941a2c7109ff651877f362b0ae833236565459f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Mar 2022 18:16:40 +0800 Subject: [PATCH] add test for encode/decode ResultRow in group by and interval --- include/libs/nodes/nodes.h | 3 +- source/libs/executor/src/executorimpl.c | 76 ++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 411d34063c..9698f0e13e 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -141,7 +141,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN + QUERY_NODE_PHYSICAL_PLAN, + QUERY_NODE_PHYSICAL_PLAN_GROUPBY } ENodeType; /** diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5d6495b52a..5ec5d8cd70 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6346,7 +6346,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { taosHashClear(pSup->pResultRowHashTable); pOperator->decodeResultRow(pOperator, result, length); if(result){ - free(result); + taosMemoryFree(result); } } @@ -6377,8 +6377,27 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) } static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { - SAggOperatorInfo *pAggInfo = pOperator->info; - SAggSupporter *pSup = &pAggInfo->aggSup; + SAggSupporter *pSup = NULL; + switch(pOperator->operatorType){ + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ + SGroupbyOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + STableIntervalOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + default:{ + qDebug("invalid operatorType: %d", pOperator->operatorType); + } + } int32_t size = taosHashGetSize(pSup->pResultRowHashTable); size_t keyLen = POINTER_BYTES; // estimate the key length @@ -6434,9 +6453,28 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l return false; } - SAggOperatorInfo *pAggInfo = pOperator->info; - SAggSupporter *pSup = &pAggInfo->aggSup; - SOptrBasicInfo *pInfo = &pAggInfo->binfo; + SAggSupporter *pSup = NULL; + switch(pOperator->operatorType){ + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggOperatorInfo *pAggInfo = pOperator->info; + //SOptrBasicInfo *pInfo = &pAggInfo->binfo; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ + SGroupbyOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + STableIntervalOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + default:{ + qDebug("invalid operatorType: %d", pOperator->operatorType); + } + } // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t count = *(int32_t*)(result); @@ -6733,6 +6771,16 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + + char *result = NULL; + int32_t length = 0; + pOperator->encodeResultRow(pOperator, &result, &length); + SAggSupporter *pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pOperator->decodeResultRow(pOperator, result, length); + if(result){ + taosMemoryFree(result); + } } closeAllResultRows(&pInfo->binfo.resultRowInfo); @@ -7149,6 +7197,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); doHashGroupbyAgg(pOperator, pBlock); + + char *result = NULL; + int32_t length = 0; + pOperator->encodeResultRow(pOperator, &result, &length); + SAggSupporter *pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pOperator->decodeResultRow(pOperator, result, length); + if(result){ + taosMemoryFree(result); + } } pOperator->status = OP_RES_TO_RETURN; @@ -7693,6 +7751,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->_openFn = doOpenIntervalAgg; pOperator->getNextFn = doBuildIntervalResult; pOperator->closeFn = destroyIntervalOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -7903,13 +7963,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = OP_Groupby; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); return pOperator;