From eea32f5f0ae23b5c649f1367bf1f5034a00daaf1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 May 2022 22:21:04 +0800 Subject: [PATCH 1/8] feat: add tag condition --- source/dnode/vnode/inc/vnode.h | 4 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 70 ++++--------------------- source/libs/executor/src/executorimpl.c | 26 ++++----- 3 files changed, 25 insertions(+), 75 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9e33973c05..c106dbfb68 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -103,9 +103,7 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab void *pMemRef); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); bool isTsdbCacheLastRow(tsdbReaderT *pReader); -int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len, - int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo, - SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); +int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ee216cb2ab..d390b59153 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "index.h" #define EXTRA_BYTES 2 #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -3875,74 +3876,25 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd // return TSDB_CODE_SUCCESS; //} -int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, - int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, - SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { - SMetaReader mr = {0}; - - metaReaderInit(&mr, (SMeta*)pMeta, 0); - - if (metaGetTableEntryByUid(&mr, uid) < 0) { - tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); - metaReaderClear(&mr); - terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; - goto _error; - } else { - tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); - } - - if (mr.me.type != TSDB_SUPER_TABLE) { - tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, - reqId); - terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client - metaReaderClear(&mr); - goto _error; - } - - metaReaderClear(&mr); - +int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond) { // NOTE: not add ref count for super table - SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true); - - // no tags and tbname condition, all child tables of this stable are involved - if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { - int32_t ret = getAllTableList(pMeta, uid, res); - if (ret != TSDB_CODE_SUCCESS) { - goto _error; - } - - pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); - pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); - - tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64 - " QID:0x%" PRIx64, - pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); - - taosArrayDestroy(res); - return ret; - } + SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); int32_t ret = TSDB_CODE_SUCCESS; - - SFilterInfo* filterInfo = NULL; - ret = filterInitFromNode((SNode*)pTagCond, &filterInfo, 0); + if(pTagCond){ + ret = doFilterTag(pTagCond, res); + }else{ + ret = getAllTableList(pMeta, uid, res); + } if (ret != TSDB_CODE_SUCCESS) { - terrno = ret; return ret; } - ret = tsdbQueryTableList(pMeta, res, filterInfo); + pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); - pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); + pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); + taosArrayPush(pGroupInfo->pGroupList, &res); - // tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, - // pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); - - taosArrayDestroy(res); return ret; - -_error: - return terrno; } int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 581cf5cacd..25c807d3c4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4438,10 +4438,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT } static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId); + STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, - uint64_t queryId, uint64_t taskId); + uint64_t queryId, uint64_t taskId, SNode* pTagCond); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractColumnInfo(SNodeList* pNodeList); @@ -4471,14 +4471,14 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { + uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; } @@ -4502,9 +4502,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { - pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond); } if (pDataReader == NULL && terrno != 0) { @@ -4551,7 +4551,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); + queryId, taskId, pTagCond); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4577,7 +4577,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, pTagCond); if (ops[i] == NULL) { return NULL; } @@ -4893,10 +4893,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod } int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, - uint64_t queryId, uint64_t taskId) { + uint64_t queryId, uint64_t taskId, SNode* pTagCond) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { - code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId); + code = tsdbQueryAllTable(metaHandle, tableUid, pGroupInfo, pTagCond); } else { // Create one table group. code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo); } @@ -4923,10 +4923,10 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { } tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId) { + STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { uint64_t uid = pTableScanNode->scan.uid; int32_t code = - doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId); + doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4962,7 +4962,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } (*pTaskInfo)->pRoot = - createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo); + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; From 71ec636c3c505b34dc51cef69bf9f4110c869eba Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 May 2022 22:29:24 +0800 Subject: [PATCH 2/8] feat: add tag condition --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 3 +-- source/libs/executor/src/executorimpl.c | 19 +++++++++++++++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c106dbfb68..88629e6c2a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -103,7 +103,7 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab void *pMemRef); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); bool isTsdbCacheLastRow(tsdbReaderT *pReader); -int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond); +int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d390b59153..0ccfbf82c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -14,7 +14,6 @@ */ #include "tsdb.h" -#include "index.h" #define EXTRA_BYTES 2 #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -2804,7 +2803,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int return numOfRows; } -static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { +int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); while (1) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 25c807d3c4..1e7f737ab4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -34,6 +34,7 @@ #include "thash.h" #include "ttypes.h" #include "vnode.h" +#include "index.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -4894,9 +4895,23 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; if (tableType == TSDB_SUPER_TABLE) { - code = tsdbQueryAllTable(metaHandle, tableUid, pGroupInfo, pTagCond); + SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); + + int32_t ret = ; + if(pTagCond){ + code = doFilterTag(pTagCond, res); + }else{ + code = tsdbGetAllTableList(metaHandle, tableUid, res); + } + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); + pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); + taosArrayPush(pGroupInfo->pGroupList, &res); } else { // Create one table group. code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo); } From 4825d48019d998cdfb40e9d082efe80778c3e773 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 May 2022 22:34:34 +0800 Subject: [PATCH 3/8] feat: add tag condition --- source/dnode/vnode/src/tsdb/tsdbRead.c | 28 -------------------------- 1 file changed, 28 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0ccfbf82c8..a5e3f7f73b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -146,8 +146,6 @@ typedef struct STableGroupSupporter { SSchema* pTagSchema; } STableGroupSupporter; -int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo); - static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList); static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); @@ -3875,32 +3873,6 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd // return TSDB_CODE_SUCCESS; //} -int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond) { - // NOTE: not add ref count for super table - SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - - int32_t ret = TSDB_CODE_SUCCESS; - if(pTagCond){ - ret = doFilterTag(pTagCond, res); - }else{ - ret = getAllTableList(pMeta, uid, res); - } - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); - pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - taosArrayPush(pGroupInfo->pGroupList, &res); - - return ret; -} - -int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) { - // impl later - - return TSDB_CODE_SUCCESS; -} int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { SMetaReader mr = {0}; From 8db0315341307357536661086fa985c17f7ba0e8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 May 2022 22:35:24 +0800 Subject: [PATCH 4/8] feat: add tag condition --- source/libs/executor/src/executorimpl.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1e7f737ab4..1683ab6eb4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4899,7 +4899,6 @@ int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUi if (tableType == TSDB_SUPER_TABLE) { SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - int32_t ret = ; if(pTagCond){ code = doFilterTag(pTagCond, res); }else{ From 29db3bad7952f793ea9573589664ad3026fc5d61 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 May 2022 22:42:44 +0800 Subject: [PATCH 5/8] feat: add tag condition --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a5e3f7f73b..28677fd589 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2801,7 +2801,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int return numOfRows; } -int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { +int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); while (1) { From 051020f9326e6926e12871bbed993283743b52a0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 26 May 2022 16:05:27 +0800 Subject: [PATCH 6/8] fix: remove table group code in 2.x --- include/common/tcommon.h | 5 +- source/dnode/vnode/inc/vnode.h | 7 +- source/dnode/vnode/src/inc/vnodeInt.h | 5 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 392 +++++++----------- source/dnode/vnode/src/vnd/vnodeQuery.c | 8 +- source/libs/executor/inc/executorimpl.h | 29 +- source/libs/executor/src/executorimpl.c | 213 ++++------ source/libs/executor/src/groupoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 15 +- source/libs/executor/src/timewindowoperator.c | 15 +- 10 files changed, 267 insertions(+), 426 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0ff13963c0..45745403f3 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -53,10 +53,9 @@ typedef enum EStreamType { } EStreamType; typedef struct { - uint32_t numOfTables; - SArray* pGroupList; + SArray* pTableList; SHashObj* map; // speedup acquire the tableQueryInfo by table uid -} STableGroupInfo; +} STableListInfo; typedef struct SColumnDataAgg { int16_t colId; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e8de74e639..28c9b8518f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -99,9 +99,9 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); -tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, +tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, void *pMemRef); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); bool isTsdbCacheLastRow(tsdbReaderT *pReader); @@ -112,9 +112,6 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockI int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); -int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); -int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); // tq diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index faf0ddcd4a..d38ff716ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -114,11 +114,10 @@ int tsdbCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); -tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); -tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, void* pMemRef); -int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo); int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 28677fd589..c0b97f7536 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -146,8 +146,8 @@ typedef struct STableGroupSupporter { SSchema* pTagSchema; } STableGroupSupporter; -static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList); -static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList); +static STimeWindow updateLastrowForEachGroup(STableListInfo* pList); +static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); // static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); @@ -233,41 +233,34 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { return rows; } -static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) { - size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); - assert(numOfGroup >= 1); +static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) { + size_t tableSize = taosArrayGetSize(pTableList->pTableList); + assert(tableSize >= 1); // allocate buffer in order to load data blocks from file - SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo)); + SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo)); if (pTableCheckInfo == NULL) { return NULL; } // todo apply the lastkey of table check to avoid to load header file - for (int32_t i = 0; i < numOfGroup; ++i) { - SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i); + for (int32_t j = 0; j < tableSize; ++j) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j); - size_t gsize = taosArrayGetSize(group); - assert(gsize > 0); - - for (int32_t j = 0; j < gsize; ++j) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j); - - STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; - if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { - info.lastKey = pTsdbReadHandle->window.skey; - } - - assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey); - } else { + STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; + if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { info.lastKey = pTsdbReadHandle->window.skey; } - taosArrayPush(pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, - info.lastKey, pTsdbReadHandle->idStr); + assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey); + } else { + info.lastKey = pTsdbReadHandle->window.skey; } + + taosArrayPush(pTableCheckInfo, &info); + tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, + info.lastKey, pTsdbReadHandle->idStr); } // TODO group table according to the tag value. @@ -478,7 +471,7 @@ _end: return NULL; } -tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -490,7 +483,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG } // todo apply the lastkey of table check to avoid to load header file - pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); + pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList); if (pTsdbReadHandle->pTableCheckInfo == NULL) { // tsdbCleanupReadHandle(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -520,8 +513,8 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG } } - tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle, - taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList), + tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle, + taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pTsdbReadHandle->idStr); return (tsdbReaderT)pTsdbReadHandle; @@ -565,7 +558,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { resetCheckInfo(pTsdbReadHandle); } -void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) { +void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList) { STsdbReadHandle* pTsdbReadHandle = queryHandle; pTsdbReadHandle->order = pCond->order; @@ -607,21 +600,21 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, uint64_t taskId) { - pCond->twindow = updateLastrowForEachGroup(groupList); + pCond->twindow = updateLastrowForEachGroup(pList); // no qualified table - if (groupList->numOfTables == 0) { + if (taosArrayGetSize(pList->pTableList) == 0) { return NULL; } - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, groupList, qId, taskId); + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } - int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList); + int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList); if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; return NULL; @@ -667,60 +660,60 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { } // leave only one table for each group -static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) { - assert(pGroupList); - size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); +//static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) { +// assert(pGroupList); +// size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); +// +// STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo)); +// pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES); +// +// for (int32_t i = 0; i < numOfGroup; ++i) { +// SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i); +// size_t numOfTables = taosArrayGetSize(oneGroup); +// +// SArray* px = taosArrayInit(4, sizeof(STableKeyInfo)); +// for (int32_t j = 0; j < numOfTables; ++j) { +// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j); +// // if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) { +// // taosArrayPush(px, pInfo); +// // pNew->numOfTables += 1; +// // break; +// // } +// } +// +// // there are no data in this group +// if (taosArrayGetSize(px) == 0) { +// taosArrayDestroy(px); +// } else { +// taosArrayPush(pNew->pGroupList, &px); +// } +// } +// +// return pNew; +//} - STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo)); - pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES); - - for (int32_t i = 0; i < numOfGroup; ++i) { - SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i); - size_t numOfTables = taosArrayGetSize(oneGroup); - - SArray* px = taosArrayInit(4, sizeof(STableKeyInfo)); - for (int32_t j = 0; j < numOfTables; ++j) { - STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j); - // if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) { - // taosArrayPush(px, pInfo); - // pNew->numOfTables += 1; - // break; - // } - } - - // there are no data in this group - if (taosArrayGetSize(px) == 0) { - taosArrayDestroy(px); - } else { - taosArrayPush(pNew->pGroupList, &px); - } - } - - return pNew; -} - -tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, - uint64_t qId, uint64_t taskId) { - STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); - - if (pNew->numOfTables == 0) { - tsdbDebug("update query time range to invalidate time window"); - - assert(taosArrayGetSize(pNew->pGroupList) == 0); - bool asc = ASCENDING_TRAVERSE(pCond->order); - if (asc) { - pCond->twindow.ekey = pCond->twindow.skey - 1; - } else { - pCond->twindow.skey = pCond->twindow.ekey - 1; - } - } - - STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId); - pTsdbReadHandle->loadExternalRow = true; - pTsdbReadHandle->currentLoadExternalRows = true; - - return pTsdbReadHandle; -} +//tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, +// uint64_t qId, uint64_t taskId) { +// STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); +// +// if (pNew->numOfTables == 0) { +// tsdbDebug("update query time range to invalidate time window"); +// +// assert(taosArrayGetSize(pNew->pGroupList) == 0); +// bool asc = ASCENDING_TRAVERSE(pCond->order); +// if (asc) { +// pCond->twindow.ekey = pCond->twindow.skey - 1; +// } else { +// pCond->twindow.skey = pCond->twindow.ekey - 1; +// } +// } +// +// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId); +// pTsdbReadHandle->loadExternalRow = true; +// pTsdbReadHandle->currentLoadExternalRows = true; +// +// return pTsdbReadHandle; +//} static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) { if (pCheckInfo->initBuf) { @@ -3338,8 +3331,8 @@ bool isTsdbCacheLastRow(tsdbReaderT* pReader) { return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE; } -int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) { - assert(pTsdbReadHandle != NULL && groupList != NULL); +int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) { + assert(pTsdbReadHandle != NULL && tableList != NULL); // TSKEY key = TSKEY_INITIAL_VAL; // @@ -3386,68 +3379,68 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) { return code; } -STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) { +STimeWindow updateLastrowForEachGroup(STableListInfo* pList) { STimeWindow window = {INT64_MAX, INT64_MIN}; - int32_t totalNumOfTable = 0; - SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t)); - - // NOTE: starts from the buffer in case of descending timestamp order check data blocks - size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); - for (int32_t j = 0; j < numOfGroups; ++j) { - SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); - TSKEY key = TSKEY_INITIAL_VAL; - - STableKeyInfo keyInfo = {0}; - - size_t numOfTables = taosArrayGetSize(pGroup); - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); - - // if the lastKey equals to INT64_MIN, there is no data in this table - TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; - if (key < lastKey) { - key = lastKey; - - // keyInfo.pTable = pInfo->pTable; - keyInfo.lastKey = key; - pInfo->lastKey = key; - - if (key < window.skey) { - window.skey = key; - } - - if (key > window.ekey) { - window.ekey = key; - } - } - } - - // more than one table in each group, only one table left for each group - // if (keyInfo.pTable != NULL) { - // totalNumOfTable++; - // if (taosArrayGetSize(pGroup) == 1) { - // // do nothing - // } else { - // taosArrayClear(pGroup); - // taosArrayPush(pGroup, &keyInfo); - // } - // } else { // mark all the empty groups, and remove it later - // taosArrayDestroy(pGroup); - // taosArrayPush(emptyGroup, &j); - // } - } - - // window does not being updated, so set the original - if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { - window = TSWINDOW_INITIALIZER; - assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); - } - - taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); - taosArrayDestroy(emptyGroup); - - groupList->numOfTables = totalNumOfTable; +// int32_t totalNumOfTable = 0; +// SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t)); +// +// // NOTE: starts from the buffer in case of descending timestamp order check data blocks +// size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); +// for (int32_t j = 0; j < numOfGroups; ++j) { +// SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); +// TSKEY key = TSKEY_INITIAL_VAL; +// +// STableKeyInfo keyInfo = {0}; +// +// size_t numOfTables = taosArrayGetSize(pGroup); +// for (int32_t i = 0; i < numOfTables; ++i) { +// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); +// +// // if the lastKey equals to INT64_MIN, there is no data in this table +// TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; +// if (key < lastKey) { +// key = lastKey; +// +// // keyInfo.pTable = pInfo->pTable; +// keyInfo.lastKey = key; +// pInfo->lastKey = key; +// +// if (key < window.skey) { +// window.skey = key; +// } +// +// if (key > window.ekey) { +// window.ekey = key; +// } +// } +// } +// +// // more than one table in each group, only one table left for each group +// // if (keyInfo.pTable != NULL) { +// // totalNumOfTable++; +// // if (taosArrayGetSize(pGroup) == 1) { +// // // do nothing +// // } else { +// // taosArrayClear(pGroup); +// // taosArrayPush(pGroup, &keyInfo); +// // } +// // } else { // mark all the empty groups, and remove it later +// // taosArrayDestroy(pGroup); +// // taosArrayPush(emptyGroup, &j); +// // } +// } +// +// // window does not being updated, so set the original +// if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { +// window = TSWINDOW_INITIALIZER; +// assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); +// } +// +// taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); +// taosArrayDestroy(emptyGroup); +// +// groupList->numOfTables = totalNumOfTable; return window; } @@ -3873,81 +3866,6 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd // return TSDB_CODE_SUCCESS; //} -int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { - SMetaReader mr = {0}; - - metaReaderInit(&mr, (SMeta*)pMeta, 0); - - if (metaGetTableEntryByUid(&mr, uid) < 0) { - terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; - goto _error; - } - - metaReaderClear(&mr); - - pGroupInfo->numOfTables = 1; - pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - - SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); - - STableKeyInfo info = {.lastKey = startKey, .uid = uid}; - taosArrayPush(group, &info); - - taosArrayPush(pGroupInfo->pGroupList, &group); - return TSDB_CODE_SUCCESS; - -_error: - metaReaderClear(&mr); - return terrno; -} - -#if 0 -int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { - if (tsdbRLockRepoMeta(tsdb) < 0) { - return terrno; - } - - assert(pTableIdList != NULL); - size_t size = taosArrayGetSize(pTableIdList); - pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); - - for(int32_t i = 0; i < size; ++i) { - STableIdInfo *id = taosArrayGet(pTableIdList, i); - - STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid); - if (pTable == NULL) { - tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid); - continue; - } - - if (pTable->type == TSDB_SUPER_TABLE) { - tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid); - terrno = TSDB_CODE_QRY_INVALID_MSG; - tsdbUnlockRepoMeta(tsdb); - taosArrayDestroy(group); - return terrno; - } - - STableKeyInfo info = {.pTable = pTable, .lastKey = id->key}; - taosArrayPush(group, &info); - } - - if (tsdbUnlockRepoMeta(tsdb) < 0) { - taosArrayDestroy(group); - return terrno; - } - - pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group); - if (pGroupInfo->numOfTables > 0) { - taosArrayPush(pGroupInfo->pGroupList, &group); - } else { - taosArrayDestroy(group); - } - - return TSDB_CODE_SUCCESS; -} -#endif static void* doFreeColumnInfoData(SArray* pColumnInfoData) { if (pColumnInfoData == NULL) { return NULL; @@ -4018,30 +3936,6 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { } #if 0 -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { - assert(pGroupList != NULL); - - size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); - - for(int32_t i = 0; i < numOfGroup; ++i) { - SArray* p = taosArrayGetP(pGroupList->pGroupList, i); - - size_t numOfTables = taosArrayGetSize(p); - for(int32_t j = 0; j < numOfTables; ++j) { - STable* pTable = taosArrayGetP(p, j); - if (pTable != NULL) { // in case of handling retrieve data from tsdb - tsdbUnRefTable(pTable); - } - //assert(pTable != NULL); - } - - taosArrayDestroy(p); - } - - taosHashCleanup(pGroupList->map); - taosArrayDestroy(pGroupList->pGroupList); - pGroupList->numOfTables = 0; -} static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) { SSkipListIterator* iter = tSkipListCreateIter(pSkipList); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3b47b90254..8317834316 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -147,16 +147,10 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { } // wrapper of tsdb read interface -tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, +tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo* tableList, uint64_t qId, void *pMemRef) { #if 0 return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef); #endif return 0; -} -int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo) { -#if 0 - return tsdbGetTableGroupFromIdListT(pVnode->pTsdb, pTableIdList, pGroupInfo); -#endif - return 0; } \ No newline at end of file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 53dddd9c22..e4daae4e7c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -49,7 +49,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) -#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) +//#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) @@ -153,7 +153,7 @@ typedef struct STaskAttr { int32_t numOfFilterCols; int64_t* fillVal; void* tsdb; - STableGroupInfo tableGroupInfo; // table list SArray +// STableListInfo tableGroupInfo; // table list int32_t vgId; } STaskAttr; @@ -193,7 +193,7 @@ typedef struct SExecTaskInfo { int32_t tversion; } schemaVer; - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + STableListInfo tableqinfoList; // this is a table list char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -215,7 +215,7 @@ typedef struct STaskRuntimeEnv { STSCursor cur; char* tagVal; // tag value of current data block - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure +// STableGroupInfo tableqinfoGroupInfo; // this is a table list struct SOperatorInfo* proot; SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value @@ -344,7 +344,7 @@ typedef struct STagScanInfo { SArray *pColMatchInfo; int32_t curPos; SReadHandle readHandle; - STableGroupInfo *pTableGroups; + STableListInfo *pTableList; } STagScanInfo; typedef enum EStreamScanMode { @@ -707,7 +707,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, @@ -720,21 +720,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, - SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo); + SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, @@ -748,14 +746,13 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo); + SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, @@ -771,8 +768,6 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win); - bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8c6c6f0f64..f0291e3c5b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -88,7 +88,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #endif #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) +//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } @@ -1846,12 +1846,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } -STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) { - STableQueryInfo* pTableQueryInfo = buf; - pTableQueryInfo->lastKey = win.skey; - return pTableQueryInfo; -} - void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; @@ -2431,7 +2425,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } -static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); +static void doDestroyTableList(STableListInfo* pTableqinfoList); static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { #if 0 @@ -3999,35 +3993,30 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { } } -static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { - if (pTableGroupInfo->numOfTables == 0) { - return NULL; - } - - STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); - if (pTableQueryInfo == NULL) { - return NULL; - } - - int32_t index = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { - SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); - for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { - STableKeyInfo* pk = taosArrayGet(pa, j); - STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; - pTQueryInfo->lastKey = pk->lastKey; - } - } - - STimeWindow win = {0, INT64_MAX}; - createTableQueryInfo(pTableQueryInfo, win); - return pTableQueryInfo; -} +//static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) { +// int32_t size = taosArrayGetSize(pTableListInfo->pTableList); +// if (size == 0) { +// return NULL; +// } +// +// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo)); +// if (pTableQueryInfo == NULL) { +// return NULL; +// } +// +// for (int32_t j = 0; j < size; ++j) { +// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j); +// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j]; +// pTQueryInfo->lastKey = pk->lastKey; +// } +// +// pTableQueryInfo->lastKey = 0; +// return pTableQueryInfo; +//} SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -4040,7 +4029,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultSizeInfo(pOperator, numOfRows); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); - pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4438,11 +4426,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT } static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); + STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); -static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, - uint64_t queryId, uint64_t taskId, SNode* pTagCond); -static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); +static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); +static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); @@ -4471,14 +4458,24 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { + uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); + int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + return NULL; + } + + tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; } @@ -4500,11 +4497,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; + int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + return NULL; + } + tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { - pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); - } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond); + pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); } if (pDataReader == NULL && terrno != 0) { @@ -4517,7 +4522,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - SArray* tableIdList = extractTableIdList(pTableGroupInfo); + SArray* tableIdList = extractTableIdList(pTableListInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); SArray* pCols = @@ -4550,8 +4555,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId, pTagCond); + int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4564,7 +4568,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = - createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); + createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo); return pOperator; } else { ASSERT(0); @@ -4577,7 +4581,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, pTagCond); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pTagCond); if (ops[i] == NULL) { return NULL; } @@ -4606,10 +4610,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pAggNode->pGroupKeys != NULL) { SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); + pScalarExprInfo, numOfScalarExpr, pTaskInfo); } else { pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, - pTaskInfo, pTableGroupInfo); + pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -4628,8 +4632,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .calTrigger = pIntervalPhyNode->window.triggerType}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTableGroupInfo, - pTaskInfo); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -4678,7 +4681,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; @@ -4892,75 +4895,57 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return pList; } -int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, - uint64_t queryId, uint64_t taskId, SNode* pTagCond) { +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, + STableListInfo* pListInfo, SNode* pTagCond) { int32_t code = TSDB_CODE_SUCCESS; + pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); + if (tableType == TSDB_SUPER_TABLE) { - SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - if(pTagCond){ + SArray* res = taosArrayInit(8, sizeof(uint64_t)); code = doFilterTag(pTagCond, res); + if (code != TSDB_CODE_SUCCESS) { + qError("doFilterTag error:%d", code); + taosArrayDestroy(res); + return code; + } + for(int i = 0; i < taosArrayGetSize(res); i++){ + STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; + taosArrayPush(pListInfo->pTableList, &info); + } + taosArrayDestroy(res); }else{ - code = tsdbGetAllTableList(metaHandle, tableUid, res); + code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); } - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); - pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - taosArrayPush(pGroupInfo->pGroupList, &res); } else { // Create one table group. - code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo); + STableKeyInfo info = {.lastKey = 0, .uid = tableUid}; + taosArrayPush(pListInfo->pTableList, &info); } return code; } -SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { +SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); - if (pTableGroupInfo->numOfTables > 0) { - SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, 0); - ASSERT(taosArrayGetSize(pTableGroupInfo->pGroupList) == 1); - - // Transfer the Array of STableKeyInfo into uid list. - size_t numOfTables = taosArrayGetSize(pa); - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pkeyInfo = taosArrayGet(pa, i); - taosArrayPush(tableIdList, &pkeyInfo->uid); - } + // Transfer the Array of STableKeyInfo into uid list. + for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { + STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); + taosArrayPush(tableIdList, &pkeyInfo->uid); } return tableIdList; } tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { - uint64_t uid = pTableScanNode->scan.uid; - int32_t code = - doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (pTableGroupInfo->numOfTables == 0) { - code = 0; - qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); - goto _error; - } - + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { SQueryTableDataCond cond = {0}; - code = initQueryTableDataCond(&cond, pTableScanNode); + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - goto _error; + return NULL; } - return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId); - -_error: - terrno = code; - return NULL; + return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); } int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, @@ -4975,7 +4960,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } (*pTaskInfo)->pRoot = - createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; @@ -5034,34 +5019,18 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { taosMemoryFree(pFilter); } -static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { - if (pTableqinfoGroupInfo->pGroupList != NULL) { - int32_t numOfGroups = (int32_t)taosArrayGetSize(pTableqinfoGroupInfo->pGroupList); - for (int32_t i = 0; i < numOfGroups; ++i) { - SArray* p = taosArrayGetP(pTableqinfoGroupInfo->pGroupList, i); +static void doDestroyTableList(STableListInfo* pTableqinfoList) { + taosArrayDestroy(pTableqinfoList->pTableList); + taosHashCleanup(pTableqinfoList->map); - size_t num = taosArrayGetSize(p); - for (int32_t j = 0; j < num; ++j) { - STableQueryInfo* item = taosArrayGetP(p, j); - destroyTableQueryInfoImpl(item); - } - - taosArrayDestroy(p); - } - } - - taosArrayDestroy(pTableqinfoGroupInfo->pGroupList); - taosHashCleanup(pTableqinfoGroupInfo->map); - - pTableqinfoGroupInfo->pGroupList = NULL; - pTableqinfoGroupInfo->map = NULL; - pTableqinfoGroupInfo->numOfTables = 0; + pTableqinfoList->pTableList = NULL; + pTableqinfoList->map = NULL; } void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); + doDestroyTableList(&pTaskInfo->tableqinfoList); destroyOperatorInfo(pTaskInfo->pRoot); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2600c17060..dfcd494144 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -346,7 +346,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -616,7 +616,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SExecTaskInfo* pTaskInfo) { SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 17238bbd9b..2833c54e2e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1610,20 +1610,19 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SExprInfo* pExprInfo = &pOperator->pExpr[0]; SSDataBlock* pRes = pInfo->pRes; - if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) { + int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); + if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; } - SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0); - char str[512] = {0}; int32_t count = 0; SMetaReader mr = {0}; metaReaderInit(&mr, pInfo->readHandle.meta, 0); - while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos); + while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { + STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); metaGetTableEntryByUid(&mr, item->uid); for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { @@ -1655,7 +1654,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } count += 1; - if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { + if (++pInfo->curPos >= size) { doSetOperatorCompleted(pOperator); } } @@ -1680,14 +1679,14 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, - STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableGroups = pTableGroupInfo; + pInfo->pTableList = pTableListInfo; pInfo->pColMatchInfo = pColMatchInfo; pInfo->pRes = pResBlock; pInfo->readHandle = *pReadHandle; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9346dbf54a..9e4db1870b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1167,8 +1167,7 @@ bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1192,8 +1191,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API - // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1228,8 +1226,7 @@ _error: SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1285,8 +1282,7 @@ _error: SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1308,8 +1304,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); - // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + if (code != TSDB_CODE_SUCCESS) { goto _error; } From 750b7ccec98a6da4add836a8011715d61de67fe4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 26 May 2022 21:45:49 +0800 Subject: [PATCH 7/8] fix: remove table group code in 2.x --- source/libs/executor/src/timewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6ba8f08a26..47dda8fc2b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1264,7 +1264,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, for (int32_t i = 0; i < numOfChild; i++) { SSDataBlock* chRes = createOneDataBlock(pResBlock, false); SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, - chRes, pInterval, primaryTsSlotId, pTwAggSupp, NULL, pTaskInfo); + chRes, pInterval, primaryTsSlotId, pTwAggSupp, pTaskInfo); if (pChildOp && chRes) { taosArrayPush(pInfo->pChildren, &pChildOp); continue; From 3e94a856297902e517ef7b0053ae703d117ebb53 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 26 May 2022 23:05:06 +0800 Subject: [PATCH 8/8] fix: remove table group code in 2.x --- source/libs/executor/src/executorimpl.c | 48 ++++++++++++------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 13f7ff8024..684c657d17 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4434,7 +4434,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT } static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); + STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo); @@ -4473,16 +4473,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); - return NULL; - } - tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; @@ -4505,19 +4495,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; - int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); - return NULL; - } - tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); + } else { + getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond); } if (pDataReader == NULL && terrno != 0) { @@ -4915,6 +4897,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, if (code != TSDB_CODE_SUCCESS) { qError("doFilterTag error:%d", code); taosArrayDestroy(res); + terrno = code; return code; } for(int i = 0; i < taosArrayGetSize(res); i++){ @@ -4946,14 +4929,29 @@ SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { } tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { - SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { + int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + code = 0; + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + goto _error; + } + + SQueryTableDataCond cond = {0}; + code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + +_error: + terrno = code; + return NULL; } int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,