[td-13039] fix bug in sort.
This commit is contained in:
parent
d537387a3b
commit
b771748eb2
|
@ -653,7 +653,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||||
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
||||||
|
|
|
@ -69,6 +69,7 @@ typedef enum SResultTsInterpType {
|
||||||
typedef struct SColMatchInfo {
|
typedef struct SColMatchInfo {
|
||||||
int32_t colId;
|
int32_t colId;
|
||||||
int32_t targetSlotId;
|
int32_t targetSlotId;
|
||||||
|
bool output;
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -330,15 +331,16 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
|
||||||
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
pBlock->info.blockId = pNode->dataBlockId;
|
pBlock->info.blockId = pNode->dataBlockId;
|
||||||
pBlock->info.rowSize = pNode->resultRowSize;
|
pBlock->info.rowSize = pNode->resultRowSize; // todo ??
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData idata = {{0}};
|
SColumnInfoData idata = {{0}};
|
||||||
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
|
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
|
||||||
idata.info.type = pDescNode->dataType.type;
|
if (!pDescNode->output) {
|
||||||
if (pDescNode->dataType.bytes > 1000) {
|
continue;
|
||||||
pDescNode->dataType.bytes = 4;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
idata.info.type = pDescNode->dataType.type;
|
||||||
idata.info.bytes = pDescNode->dataType.bytes;
|
idata.info.bytes = pDescNode->dataType.bytes;
|
||||||
idata.info.scale = pDescNode->dataType.scale;
|
idata.info.scale = pDescNode->dataType.scale;
|
||||||
idata.info.slotId = pDescNode->slotId;
|
idata.info.slotId = pDescNode->slotId;
|
||||||
|
@ -2863,16 +2865,19 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfCols = pBlock->info.numOfCols;
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||||
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||||
ASSERT(pColMatchInfo->colId == p->info.colId);
|
if (!pColMatchInfo->output) {
|
||||||
|
continue;
|
||||||
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||||
|
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
@ -6249,7 +6254,7 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -8474,17 +8479,17 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractScanColumnId(SNodeList* pNodeList);
|
static SArray* extractScanColumnId(SNodeList* pNodeList);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
|
int32_t numOfCols = 0;
|
||||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
|
|
||||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo);
|
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
|
||||||
|
@ -8523,15 +8528,13 @@ static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
assert(size == 1);
|
assert(size == 1);
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||||
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
|
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
|
||||||
}
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
assert(size == 1);
|
assert(size == 1);
|
||||||
|
@ -8584,12 +8587,9 @@ static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t num = 0;
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num);
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
|
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
|
||||||
return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo);
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
assert(size == 1);
|
assert(size == 1);
|
||||||
|
@ -8730,7 +8730,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList) {
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -8743,12 +8743,25 @@ SArray* extractColMatchInfo(SNodeList* pNodeList) {
|
||||||
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
|
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
|
||||||
|
|
||||||
SColMatchInfo c = {0};
|
SColMatchInfo c = {0};
|
||||||
c.colId = pColNode->colId;
|
c.colId = pColNode->colId;
|
||||||
c.targetSlotId = pNode->slotId;
|
c.targetSlotId = pNode->slotId;
|
||||||
|
c.output = true;
|
||||||
taosArrayPush(pList, &c);
|
taosArrayPush(pList, &c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*numOfOutputCols = 0;
|
||||||
|
|
||||||
|
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SSlotDescNode* pNode = (SSlotDescNode*) nodesListGetNode(pOutputNodeList->pSlots, i);
|
||||||
|
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
|
||||||
|
if (!pNode->output) {
|
||||||
|
info->output = false;
|
||||||
|
} else {
|
||||||
|
(*numOfOutputCols) += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -414,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
|
||||||
|
|
||||||
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
|
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
|
if (numOfSources == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate the I/O counts to complete the data sort.
|
// Calculate the I/O counts to complete the data sort.
|
||||||
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
|
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
|
||||||
|
@ -602,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
||||||
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
|
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (numOfSources == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
|
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue