diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f8ae82ed67..8f55488e2c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -615,7 +615,7 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) } memcpy(pCol->pData, pStart, colLength); - pStart += colLength; + pStart += pCol->info.bytes * capacity; } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b4302787fb..35e4ec408c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -690,8 +690,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); - +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c675a59e89..ce276e432e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7056,6 +7056,7 @@ static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* createSortInfo(SNodeList* pNodeList); +static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { @@ -7165,9 +7166,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; - SArray* pColList = extractColumnInfo(pPartNode->pPartitionKeys); + SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createPartitionOperatorInfo(op, pResBlock, pColList, pTaskInfo, NULL); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); + + return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7265,6 +7270,32 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } +SArray* extractPartitionColInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i); + + // todo extract method + SColumn c = {0}; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.precision = pColNode->node.resType.precision; + c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &c); + } + + return pList; +} + SArray* createSortInfo(SNodeList* pNodeList) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e9b0497587..efba5744d7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -360,7 +360,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); for (int32_t j = 0; j < pBlock->info.rows; ++j) { recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); - int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); @@ -398,9 +397,12 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t* rows = (int32_t*) pPage; - size_t numOfCols = pInfo->binfo.pRes->info.numOfCols; + size_t numOfCols = pOperator->numOfOutput; for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SExprInfo* pExpr = &pOperator->pExpr[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); int32_t bytes = pColInfoData->info.bytes; int32_t startOffset = pInfo->columnOffset[i]; @@ -538,7 +540,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pInfo->columnOffset); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList, +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -561,7 +563,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf)); pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity); - code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -573,7 +574,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; - pOperator->numOfOutput = pResultBlock->info.numOfCols; + pOperator->numOfOutput = numOfCols; + pOperator->pExpr = pExprInfo; pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashPartition;