From 588fd84853d544d2a137ba4f376d9f469e763b53 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 24 Apr 2022 06:19:12 +0000 Subject: [PATCH] refact meta --- source/dnode/vnode/inc/vnode.h | 22 ++++++++--- source/dnode/vnode/src/meta/metaQuery.c | 45 +++++++++-------------- source/dnode/vnode/src/meta/metaTDBImpl.c | 6 +-- source/libs/executor/inc/executorimpl.h | 5 ++- source/libs/executor/src/scanoperator.c | 35 +++++++++--------- source/libs/executor/test/CMakeLists.txt | 2 +- 6 files changed, 58 insertions(+), 57 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b35dc394b2..fae30065ae 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -31,6 +31,8 @@ #include "tmsg.h" #include "trow.h" +#include "tdbInt.h" + #ifdef __cplusplus extern "C" { #endif @@ -65,20 +67,19 @@ typedef struct SMeta SMeta; // todo: remove typedef struct SMetaReader SMetaReader; typedef struct SMetaEntry SMetaEntry; -void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags); -void metaReaderClear(SMetaReader *pReader); -int metaReadNext(SMetaReader *pReader); -const SMetaEntry *metaReaderGetEntry(SMetaReader *pReader); +void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags); +void metaReaderClear(SMetaReader *pReader); +int metaReadNext(SMetaReader *pReader); +#if 1 // refact APIs below (TODO) typedef SVCreateTbReq STbCfg; typedef SVCreateTSmaReq SSmaCfg; -#if 1 typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -char *metaTbCursorNext(SMTbCursor *pTbCur); +int metaTbCursorNext(SMTbCursor *pTbCur); #endif // tsdb @@ -210,6 +211,15 @@ struct SMetaReader { int szBuf; }; +struct SMTbCursor { + TDBC *pDbc; + void *pKey; + void *pVal; + int kLen; + int vLen; + SMetaReader mr; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 059b5c9001..d8030a42c0 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -75,68 +75,59 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { } int metaReadNext(SMetaReader *pReader) { + SMeta *pMeta = pReader->pMeta; + // TODO + return 0; } -const SMetaEntry *metaReaderGetEntry(SMetaReader *pReader) { return &pReader->me; } - #if 1 // =================================================== SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *pTbCur = NULL; -#if 0 - SMetaDB *pDB = pMeta->pDB; pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur)); if (pTbCur == NULL) { return NULL; } - tdbDbcOpen(pDB->pTbDB, &pTbCur->pDbc); + metaReaderInit(&pTbCur->mr, pMeta->pVnode, 0); + + tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc); -#endif return pTbCur; } void metaCloseTbCursor(SMTbCursor *pTbCur) { -#if 0 if (pTbCur) { + TDB_FREE(pTbCur->pKey); + TDB_FREE(pTbCur->pVal); + metaReaderClear(&pTbCur->mr); if (pTbCur->pDbc) { tdbDbcClose(pTbCur->pDbc); } taosMemoryFree(pTbCur); } -#endif } -char *metaTbCursorNext(SMTbCursor *pTbCur) { -#if 0 - void *pKey = NULL; - void *pVal = NULL; - int kLen; - int vLen; +int metaTbCursorNext(SMTbCursor *pTbCur) { int ret; void *pBuf; STbCfg tbCfg; for (;;) { - ret = tdbDbNext(pTbCur->pDbc, &pKey, &kLen, &pVal, &vLen); - if (ret < 0) break; - pBuf = pVal; - metaDecodeTbInfo(pBuf, &tbCfg); - if (tbCfg.type == META_SUPER_TABLE) { - taosMemoryFree(tbCfg.name); - taosMemoryFree(tbCfg.stbCfg.pTagSchema); - continue; - } else if (tbCfg.type == META_CHILD_TABLE) { - kvRowFree(tbCfg.ctbCfg.pTag); + ret = tdbDbNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen); + if (ret < 0) { + return -1; } - return tbCfg.name; + metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey); + if (pTbCur->mr.me.type == META_SUPER_TABLE) { + continue; + } } -#endif - return NULL; + return 0; } STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index fe5c866dc5..d7a0fb5885 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -47,7 +47,7 @@ struct SMetaDB { #endif }; -#pragma pack(push,1) +#pragma pack(push, 1) typedef struct { tb_uid_t uid; int32_t sver; @@ -406,10 +406,6 @@ static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_ return pSchemaWrapper; } -struct SMTbCursor { - TDBC *pDbc; -}; - struct SMCtbCursor { TDBC *pCur; tb_uid_t suid; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5adfb7caca..c215dbf3ab 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +// clang-format off #ifndef TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H @@ -38,6 +39,8 @@ extern "C" { #include "tmsg.h" #include "tpagedbuf.h" +#include "vnode.h" + typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) @@ -379,7 +382,7 @@ typedef struct SSysTableScanInfo { int32_t accountId; bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database - void* pCur; // cursor for iterate the local table meta store. + SMTbCursor* pCur; // cursor for iterate the local table meta store. SArray* scanCols; // SArray scan column id list // int32_t type; // show type, TODO remove it diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3a9742d48a..d7d3a3698e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -376,8 +376,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { } SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, - int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, - SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { + int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, + SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, + double sampleRatio, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -390,19 +391,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int return NULL; } - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; - pInfo->dataBlockLoadFlag= dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; + pInfo->dataBlockLoadFlag = dataLoadFlag; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; @@ -828,8 +829,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { int32_t numOfRows = 0; char n[TSDB_TABLE_NAME_LEN] = {0}; - while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { - STR_TO_VARSTR(n, name); + while (metaTbCursorNext(pInfo->pCur) == 0) { + STR_TO_VARSTR(n, pInfo->pCur->mr.me.name); colDataAppend(pTableNameCol, numOfRows, n, false); numOfRows += 1; if (numOfRows >= pInfo->capacity) { diff --git a/source/libs/executor/test/CMakeLists.txt b/source/libs/executor/test/CMakeLists.txt index b1f379585b..129509d6c6 100644 --- a/source/libs/executor/test/CMakeLists.txt +++ b/source/libs/executor/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(executorTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( executorTest - PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes + PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode ) TARGET_INCLUDE_DIRECTORIES(