fix(query): update the column match strategy.
This commit is contained in:
parent
2b803d8b93
commit
6115d3dbe8
|
@ -325,10 +325,15 @@ typedef struct SExchangeInfo {
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
|
#define COL_MATCH_FROM_COL_ID 0x1
|
||||||
|
#define COL_MATCH_FROM_SLOT_ID 0x2
|
||||||
|
|
||||||
typedef struct SColMatchInfo {
|
typedef struct SColMatchInfo {
|
||||||
|
int32_t srcSlotId; // source slot id
|
||||||
int32_t colId;
|
int32_t colId;
|
||||||
int32_t targetSlotId;
|
int32_t targetSlotId;
|
||||||
bool output;
|
bool output;
|
||||||
|
int32_t matchType; // determinate the source according to col id or slot id
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
typedef struct SScanInfo {
|
typedef struct SScanInfo {
|
||||||
|
|
|
@ -4699,10 +4699,9 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* createIndexMap(SNodeList* pNodeList);
|
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
|
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
|
||||||
|
@ -4734,9 +4733,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pColList =
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
|
@ -4761,10 +4761,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo,
|
||||||
pScanPhyNode->node.pConditions);
|
pScanPhyNode->node.pConditions);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
|
@ -4773,18 +4774,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList =
|
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||||
extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||||
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
|
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
|
||||||
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
|
@ -4796,8 +4801,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList =
|
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
|
||||||
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -4869,15 +4873,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColList =
|
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
extractColMatchInfo(pSortPhyNode->pTargets, pSortPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
|
@ -5059,25 +5064,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* createIndexMap(SNodeList* pNodeList) {
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(int32_t));
|
|
||||||
if (pList == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return pList;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeList, i);
|
|
||||||
|
|
||||||
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
|
|
||||||
taosArrayPush(pList, &pColNode->slotId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pList;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
|
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -5090,8 +5077,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||||
|
|
||||||
SColMatchInfo c = {0};
|
SColMatchInfo c = {0};
|
||||||
c.output = true;
|
c.output = true;
|
||||||
c.colId = pColNode->colId;
|
c.colId = pColNode->colId;
|
||||||
|
c.srcSlotId = pColNode->slotId;
|
||||||
|
c.matchType = type;
|
||||||
c.targetSlotId = pNode->slotId;
|
c.targetSlotId = pNode->slotId;
|
||||||
taosArrayPush(pList, &c);
|
taosArrayPush(pList, &c);
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
memcpy(pkey->pData, val, varDataTLen(val));
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
|
ASSERT(varDataTLen(val) <= pkey->bytes);
|
||||||
} else {
|
} else {
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,17 +87,13 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
|
|
||||||
if (p->info.rows > 0) {
|
if (p->info.rows > 0) {
|
||||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
for(int32_t j = 0; j < p->info.numOfCols; ++j) {
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, j);
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
if (pSrc->info.colId == pmInfo->colId) {
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
|
||||||
colDataAssign(pDst, pSrc, p->info.rows);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
|
Loading…
Reference in New Issue