[td-14493] fix bug in partition by.
This commit is contained in:
parent
28456a034c
commit
e77ecba753
|
@ -615,7 +615,7 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity)
|
|||
}
|
||||
|
||||
memcpy(pCol->pData, pStart, colLength);
|
||||
pStart += colLength;
|
||||
pStart += pCol->info.bytes * capacity;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -690,8 +690,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||
|
|
|
@ -7056,6 +7056,7 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
|
|||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
|
||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||
|
@ -7165,9 +7166,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
|
||||
SArray* pColList = extractColumnInfo(pPartNode->pPartitionKeys);
|
||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
return createPartitionOperatorInfo(op, pResBlock, pColList, pTaskInfo, NULL);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
||||
|
||||
return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
|
||||
|
@ -7265,6 +7270,32 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||
if (pList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
|
||||
|
||||
// todo extract method
|
||||
SColumn c = {0};
|
||||
c.slotId = pColNode->slotId;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.precision = pColNode->node.resType.precision;
|
||||
c.scale = pColNode->node.resType.scale;
|
||||
|
||||
taosArrayPush(pList, &c);
|
||||
}
|
||||
|
||||
return pList;
|
||||
}
|
||||
|
||||
SArray* createSortInfo(SNodeList* pNodeList) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
|
||||
|
|
|
@ -360,7 +360,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
|
||||
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
|
||||
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
|
||||
|
@ -398,9 +397,12 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
int32_t* rows = (int32_t*) pPage;
|
||||
|
||||
size_t numOfCols = pInfo->binfo.pRes->info.numOfCols;
|
||||
size_t numOfCols = pOperator->numOfOutput;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SExprInfo* pExpr = &pOperator->pExpr[i];
|
||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
int32_t startOffset = pInfo->columnOffset[i];
|
||||
|
@ -538,7 +540,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFree(pInfo->columnOffset);
|
||||
}
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -561,7 +563,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc
|
|||
|
||||
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
|
||||
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
|
||||
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -573,7 +574,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc
|
|||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
|
||||
pInfo->binfo.pRes = pResultBlock;
|
||||
pOperator->numOfOutput = pResultBlock->info.numOfCols;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashPartition;
|
||||
|
|
Loading…
Reference in New Issue