diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3c25d8add4..3620f45408 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -65,6 +65,12 @@ extern "C" { (list) = NULL; \ } while (0) +#define NODES_CLEAR_LIST(list) \ + do { \ + nodesClearList((list)); \ + (list) = NULL; \ + } while (0) + typedef enum ENodeType { // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // VALUE, OPERATOR, FUNCTION and so on. diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 26d055d7d2..749d58b224 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -69,6 +69,7 @@ typedef struct SScanLogicNode { int16_t tsColId; double filesFactor; SArray* pSmaIndexes; + SNodeList* pPartTags; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; - SNodeList* pPartitionKeys; + SNodeList* pPartitionTags; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 15191bf435..ce19d22ffc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "tref.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -21,6 +20,7 @@ #include "querynodes.h" #include "tfill.h" #include "tname.h" +#include "tref.h" #include "tdatablock.h" #include "tglobal.h" @@ -79,7 +79,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define realloc u_realloc #endif -#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) +#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } @@ -2395,7 +2395,7 @@ typedef struct SFetchRspHandleWrapper { } SFetchRspHandleWrapper; int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { - SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param; + SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param; SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); if (pExchangeInfo == NULL) { @@ -2403,7 +2403,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) return TSDB_CODE_SUCCESS; } - int32_t index = pWrapper->sourceIndex; + int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); if (code == TSDB_CODE_SUCCESS) { @@ -2411,9 +2411,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->compLen = htonl(pRsp->compLen); + pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); - pRsp->useconds = htobe64(pRsp->useconds); + pRsp->useconds = htobe64(pRsp->useconds); ASSERT(pRsp != NULL); qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); @@ -2475,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2489,14 +2489,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf } SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); - pWrapper->exchangeId = pExchangeInfo->self; + pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; pMsgSendInfo->param = pWrapper; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgType = TDMT_VND_FETCH; - pMsgSendInfo->fp = loadRemoteDataCallback; + pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); @@ -2645,7 +2645,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", completed:%d try next %d/%"PRIzu, + ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; @@ -2663,8 +2663,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 + ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); completed += 1; @@ -2718,7 +2719,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { int64_t endTs = taosGetTimestampUs(); qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo), - totalSources, (endTs - startTs)/1000.0); + totalSources, (endTs - startTs) / 1000.0); pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; @@ -2849,8 +2850,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.taskId = id; - dataInfo.index = i; - SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + dataInfo.index = i; + SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); if (pDs == NULL) { taosArrayDestroy(pInfo->pSourceDataInfo); return TSDB_CODE_OUT_OF_MEMORY; @@ -2864,7 +2865,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints); if (numOfSources == 0) { - qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources); + qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources); return TSDB_CODE_INVALID_PARA; } @@ -2898,16 +2899,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode tsem_init(&pInfo->ready, 0, 0); - pInfo->seqLoadData = false; - pInfo->pTransporter = pTransporter; - pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); - pOperator->name = "ExchangeOperator"; + pInfo->seqLoadData = false; + pInfo->pTransporter = pTransporter; + pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->numOfExprs = pInfo->pResult->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -4066,7 +4067,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { } void doDestroyExchangeOperatorInfo(void* param) { - SExchangeInfo* pExInfo = (SExchangeInfo*) param; + SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSourceDataInfo); @@ -4554,8 +4555,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI return TSDB_CODE_SUCCESS; } -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){ - if(groupKey == NULL) { +int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) { + if (groupKey == NULL) { return TDB_CODE_SUCCESS; } @@ -4564,11 +4565,11 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TSDB_CODE_OUT_OF_MEMORY; } int32_t keyLen = 0; - void *keyBuf = NULL; + void* keyBuf = NULL; int32_t numOfGroupCols = taosArrayGetSize(groupKey); for (int32_t j = 0; j < numOfGroupCols; ++j) { SColumn* pCol = taosArrayGet(groupKey, j); - keyLen += pCol->bytes; // actual data + null_flag + keyLen += pCol->bytes; // actual data + null_flag } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; @@ -4579,9 +4580,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TSDB_CODE_OUT_OF_MEMORY; } - for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){ - STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i); - SMetaReader mr = {0}; + for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); metaGetTableEntryByUid(&mr, info->uid); @@ -4590,23 +4591,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, for (int32_t j = 0; j < numOfGroupCols; ++j) { SColumn* pCol = taosArrayGet(groupKey, j); - if(strcmp(pCol->name, "tbname") == 0){ + if (strcmp(pCol->name, "tbname") == 0) { isNull[i] = 0; memcpy(pStart, mr.me.name, strlen(mr.me.name)); pStart += strlen(mr.me.name); - }else{ + } else { STagVal tagVal = {0}; tagVal.cid = pCol->colId; const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal); - if(p == NULL){ + if (p == NULL) { isNull[j] = 1; continue; } isNull[i] = 0; if (pCol->type == TSDB_DATA_TYPE_JSON) { -// int32_t dataLen = getJsonValueLen(pkey->pData); -// memcpy(pStart, (pkey->pData), dataLen); -// pStart += dataLen; + // int32_t dataLen = getJsonValueLen(pkey->pData); + // memcpy(pStart, (pkey->pData), dataLen); + // pStart += dataLen; } else if (IS_VAR_DATA_TYPE(pCol->type)) { memcpy(pStart, tagVal.pData, tagVal.nData); pStart += tagVal.nData; @@ -4618,7 +4619,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } - int32_t len = (int32_t) (pStart - (char*)keyBuf); + int32_t len = (int32_t)(pStart - (char*)keyBuf); uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len); if (groupId) { taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t)); @@ -4653,7 +4654,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); if (code) { @@ -4661,7 +4662,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; @@ -4671,11 +4672,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); - generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); + generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); - SOperatorInfo* pOperator = - createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; @@ -4700,16 +4700,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } - SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); - int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); + int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); - if (code){ + if (code) { tsdbCleanupReadHandle(pDataReader); return NULL; } - SOperatorInfo* pOperator = - createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { @@ -4718,7 +4717,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions); + int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, + pScanPhyNode->node.pConditions); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4796,7 +4796,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { - SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -5034,7 +5034,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } SArray* extractPartitionColInfo(SNodeList* pNodeList) { - if(!pNodeList) return NULL; + if (!pNodeList) return NULL; size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); if (pList == NULL) { @@ -5139,7 +5139,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa SArray* res = taosArrayInit(8, sizeof(uint64_t)); code = doFilterTag(pTagCond, &metaArg, res); - if (code == TSDB_CODE_INDEX_REBUILDING){ // todo + if (code == TSDB_CODE_INDEX_REBUILDING) { // todo // doFilter(); } else if (code != TSDB_CODE_SUCCESS) { qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); @@ -5499,4 +5499,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF } return code; } - diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 1ef0ccf7f9..2e30b01357 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(tsColId); COPY_SCALAR_FIELD(filesFactor); + CLONE_NODE_LIST_FIELD(pPartTags); return (SNode*)pDst; } @@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys COPY_SCALAR_FIELD(ratio); COPY_SCALAR_FIELD(dataRequired); CLONE_NODE_LIST_FIELD(pDynamicScanFuncs); - CLONE_NODE_LIST_FIELD(pPartitionKeys); + CLONE_NODE_LIST_FIELD(pPartitionTags); COPY_SCALAR_FIELD(interval); COPY_SCALAR_FIELD(offset); COPY_SCALAR_FIELD(sliding); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 968bb75997..fb6a428d3c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols"; static const char* jkScanLogicPlanTableId = "TableId"; static const char* jkScanLogicPlanTableType = "TableType"; static const char* jkScanLogicPlanTagCond = "TagCond"; +static const char* jkScanLogicPlanPartTags = "PartTags"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags); + } return code; } @@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags); + } return code; } @@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; static const char* jkTableScanPhysiPlanWatermark = "watermark"; static const char* jkTableScanPhysiPlanTsColId = "tsColId"; static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor"; +static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags); + } return code; } @@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs); } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code); @@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags); + } + return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 41b80eaaa8..9d7cd0cf27 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { return false; } if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) || - (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && - pNode->pParent->pParent && - QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) { + (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent && + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) { return true; } return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); @@ -222,9 +221,8 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) { static void setScanWindowInfo(SScanLogicNode* pScan) { SLogicNode* pParent = pScan->node.pParent; - if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && - pParent->pParent && - QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent && + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) { pParent = pParent->pParent; } if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) { @@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) return smaOptimizeImpl(pCxt, pLogicSubplan, pScan); } +static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + +static bool partTagsOptHasCol(SNodeList* pPartKeys) { + bool hasCol = false; + nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol); + return hasCol; +} + +static bool partTagsOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + return false; + } + + return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys); +} + +static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SPartitionLogicNode* pPart = + (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized); + if (NULL == pPart) { + return TSDB_CODE_SUCCESS; + } + + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0); + TSWAP(pPart->pPartitionKeys, pScan->pPartTags); + int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan); + if (TSDB_CODE_SUCCESS == code) { + NODES_CLEAR_LIST(pPart->node.pChildren); + nodesDestroyNode((SNode*)pPart); + } + return code; +} + // clang-format off static const SOptimizeRule optimizeRuleSet[] = { - {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, + {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, - {.pName = "SmaIndex", .optimizeFunc = smaOptimize} + {.pName = "SmaIndex", .optimizeFunc = smaOptimize}, + {.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize} }; // clang-format on diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 08e693da71..2974b3ef8c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); - if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) { + pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags); + if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) || + (NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) { nodesDestroyNode((SNode*)pTableScan); return TSDB_CODE_OUT_OF_MEMORY; }