feat: partition by tags optimize
This commit is contained in:
parent
b7b64fda84
commit
7f78d4a79b
|
@ -65,6 +65,12 @@ extern "C" {
|
||||||
(list) = NULL; \
|
(list) = NULL; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
#define NODES_CLEAR_LIST(list) \
|
||||||
|
do { \
|
||||||
|
nodesClearList((list)); \
|
||||||
|
(list) = NULL; \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
typedef enum ENodeType {
|
typedef enum ENodeType {
|
||||||
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
|
||||||
// VALUE, OPERATOR, FUNCTION and so on.
|
// VALUE, OPERATOR, FUNCTION and so on.
|
||||||
|
|
|
@ -69,6 +69,7 @@ typedef struct SScanLogicNode {
|
||||||
int16_t tsColId;
|
int16_t tsColId;
|
||||||
double filesFactor;
|
double filesFactor;
|
||||||
SArray* pSmaIndexes;
|
SArray* pSmaIndexes;
|
||||||
|
SNodeList* pPartTags;
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode {
|
||||||
double ratio;
|
double ratio;
|
||||||
int32_t dataRequired;
|
int32_t dataRequired;
|
||||||
SNodeList* pDynamicScanFuncs;
|
SNodeList* pDynamicScanFuncs;
|
||||||
SNodeList* pPartitionKeys;
|
SNodeList* pPartitionTags;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
|
|
@ -2909,7 +2909,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
||||||
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
|
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
|
||||||
|
|
||||||
if (numOfSources == 0) {
|
if (numOfSources == 0) {
|
||||||
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources);
|
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4591,8 +4591,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) {
|
||||||
if(groupKey == NULL) {
|
if (groupKey == NULL) {
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4601,7 +4601,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
void *keyBuf = NULL;
|
void* keyBuf = NULL;
|
||||||
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
|
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
|
||||||
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
||||||
SColumn* pCol = taosArrayGet(groupKey, j);
|
SColumn* pCol = taosArrayGet(groupKey, j);
|
||||||
|
@ -4616,8 +4616,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||||
STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
metaGetTableEntryByUid(&mr, info->uid);
|
metaGetTableEntryByUid(&mr, info->uid);
|
||||||
|
@ -4627,23 +4627,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
||||||
SColumn* pCol = taosArrayGet(groupKey, j);
|
SColumn* pCol = taosArrayGet(groupKey, j);
|
||||||
|
|
||||||
if(strcmp(pCol->name, "tbname") == 0){
|
if (strcmp(pCol->name, "tbname") == 0) {
|
||||||
isNull[i] = 0;
|
isNull[i] = 0;
|
||||||
memcpy(pStart, mr.me.name, strlen(mr.me.name));
|
memcpy(pStart, mr.me.name, strlen(mr.me.name));
|
||||||
pStart += strlen(mr.me.name);
|
pStart += strlen(mr.me.name);
|
||||||
}else{
|
} else {
|
||||||
STagVal tagVal = {0};
|
STagVal tagVal = {0};
|
||||||
tagVal.cid = pCol->colId;
|
tagVal.cid = pCol->colId;
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
|
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
|
||||||
if(p == NULL){
|
if (p == NULL) {
|
||||||
isNull[j] = 1;
|
isNull[j] = 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
isNull[i] = 0;
|
isNull[i] = 0;
|
||||||
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
// int32_t dataLen = getJsonValueLen(pkey->pData);
|
// int32_t dataLen = getJsonValueLen(pkey->pData);
|
||||||
// memcpy(pStart, (pkey->pData), dataLen);
|
// memcpy(pStart, (pkey->pData), dataLen);
|
||||||
// pStart += dataLen;
|
// pStart += dataLen;
|
||||||
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
memcpy(pStart, tagVal.pData, tagVal.nData);
|
memcpy(pStart, tagVal.pData, tagVal.nData);
|
||||||
pStart += tagVal.nData;
|
pStart += tagVal.nData;
|
||||||
|
@ -4655,7 +4655,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = (int32_t) (pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
||||||
if (groupId) {
|
if (groupId) {
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
|
||||||
|
@ -4690,16 +4690,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
|
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
|
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
||||||
taosArrayDestroy(groupKeys);
|
taosArrayDestroy(groupKeys);
|
||||||
if (code){
|
if (code) {
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
tsdbCleanupReadHandle(pDataReader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
|
||||||
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
@ -4726,16 +4725,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
|
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
||||||
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
|
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
||||||
taosArrayDestroy(groupKeys);
|
taosArrayDestroy(groupKeys);
|
||||||
if (code){
|
if (code) {
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
tsdbCleanupReadHandle(pDataReader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
|
||||||
createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
|
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
|
@ -4744,7 +4742,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions);
|
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
|
||||||
|
pScanPhyNode->node.pConditions);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4821,7 +4820,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
||||||
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
@ -5061,7 +5060,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||||
if(!pNodeList) return NULL;
|
if (!pNodeList) return NULL;
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -5166,7 +5165,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
|
||||||
|
|
||||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||||
code = doFilterTag(pTagCond, &metaArg, res);
|
code = doFilterTag(pTagCond, &metaArg, res);
|
||||||
if (code == TSDB_CODE_INDEX_REBUILDING){ // todo
|
if (code == TSDB_CODE_INDEX_REBUILDING) { // todo
|
||||||
// doFilter();
|
// doFilter();
|
||||||
} else if (code != TSDB_CODE_SUCCESS) {
|
} else if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
|
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
|
||||||
|
@ -5524,4 +5523,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(watermark);
|
COPY_SCALAR_FIELD(watermark);
|
||||||
COPY_SCALAR_FIELD(tsColId);
|
COPY_SCALAR_FIELD(tsColId);
|
||||||
COPY_SCALAR_FIELD(filesFactor);
|
COPY_SCALAR_FIELD(filesFactor);
|
||||||
|
CLONE_NODE_LIST_FIELD(pPartTags);
|
||||||
return (SNode*)pDst;
|
return (SNode*)pDst;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys
|
||||||
COPY_SCALAR_FIELD(ratio);
|
COPY_SCALAR_FIELD(ratio);
|
||||||
COPY_SCALAR_FIELD(dataRequired);
|
COPY_SCALAR_FIELD(dataRequired);
|
||||||
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
|
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
|
||||||
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
CLONE_NODE_LIST_FIELD(pPartitionTags);
|
||||||
COPY_SCALAR_FIELD(interval);
|
COPY_SCALAR_FIELD(interval);
|
||||||
COPY_SCALAR_FIELD(offset);
|
COPY_SCALAR_FIELD(offset);
|
||||||
COPY_SCALAR_FIELD(sliding);
|
COPY_SCALAR_FIELD(sliding);
|
||||||
|
|
|
@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
||||||
static const char* jkScanLogicPlanTableId = "TableId";
|
static const char* jkScanLogicPlanTableId = "TableId";
|
||||||
static const char* jkScanLogicPlanTableType = "TableType";
|
static const char* jkScanLogicPlanTableType = "TableType";
|
||||||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
|
static const char* jkScanLogicPlanPartTags = "PartTags";
|
||||||
|
|
||||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||||
|
@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
|
||||||
static const char* jkTableScanPhysiPlanWatermark = "watermark";
|
static const char* jkTableScanPhysiPlanWatermark = "watermark";
|
||||||
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
|
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
|
||||||
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
|
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
|
||||||
|
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
|
||||||
|
|
||||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||||
|
@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
|
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
|
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
|
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
|
||||||
|
@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
|
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
|
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
|
||||||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) &&
|
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
|
||||||
pNode->pParent->pParent &&
|
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) {
|
||||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
|
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
|
||||||
|
@ -222,8 +221,7 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
|
||||||
|
|
||||||
static void setScanWindowInfo(SScanLogicNode* pScan) {
|
static void setScanWindowInfo(SScanLogicNode* pScan) {
|
||||||
SLogicNode* pParent = pScan->node.pParent;
|
SLogicNode* pParent = pScan->node.pParent;
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) &&
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent &&
|
||||||
pParent->pParent &&
|
|
||||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
|
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
|
||||||
pParent = pParent->pParent;
|
pParent = pParent->pParent;
|
||||||
}
|
}
|
||||||
|
@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
|
||||||
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) {
|
||||||
|
*(bool*)pContext = true;
|
||||||
|
return DEAL_RES_END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool partTagsOptHasCol(SNodeList* pPartKeys) {
|
||||||
|
bool hasCol = false;
|
||||||
|
nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol);
|
||||||
|
return hasCol;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
SPartitionLogicNode* pPart =
|
||||||
|
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
|
||||||
|
if (NULL == pPart) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
|
||||||
|
TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
|
||||||
|
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
NODES_CLEAR_LIST(pPart->node.pChildren);
|
||||||
|
nodesDestroyNode((SNode*)pPart);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
static const SOptimizeRule optimizeRuleSet[] = {
|
static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
|
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
|
||||||
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
||||||
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
||||||
{.pName = "SmaIndex", .optimizeFunc = smaOptimize}
|
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
|
||||||
|
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||||
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
||||||
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
||||||
if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
|
pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
|
||||||
|
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
|
||||||
|
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
|
||||||
nodesDestroyNode((SNode*)pTableScan);
|
nodesDestroyNode((SNode*)pTableScan);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue