refact meta

This commit is contained in:
Hongze Cheng 2022-04-24 06:19:12 +00:00
parent cc9496e5a0
commit 588fd84853
6 changed files with 58 additions and 57 deletions

View File

@ -31,6 +31,8 @@
#include "tmsg.h" #include "tmsg.h"
#include "trow.h" #include "trow.h"
#include "tdbInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -65,20 +67,19 @@ typedef struct SMeta SMeta; // todo: remove
typedef struct SMetaReader SMetaReader; typedef struct SMetaReader SMetaReader;
typedef struct SMetaEntry SMetaEntry; typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags); void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags);
void metaReaderClear(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader);
int metaReadNext(SMetaReader *pReader); int metaReadNext(SMetaReader *pReader);
const SMetaEntry *metaReaderGetEntry(SMetaReader *pReader);
#if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg; typedef SVCreateTSmaReq SSmaCfg;
#if 1
typedef struct SMTbCursor SMTbCursor; typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur); int metaTbCursorNext(SMTbCursor *pTbCur);
#endif #endif
// tsdb // tsdb
@ -210,6 +211,15 @@ struct SMetaReader {
int szBuf; int szBuf;
}; };
struct SMTbCursor {
TDBC *pDbc;
void *pKey;
void *pVal;
int kLen;
int vLen;
SMetaReader mr;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -75,68 +75,59 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
} }
int metaReadNext(SMetaReader *pReader) { int metaReadNext(SMetaReader *pReader) {
SMeta *pMeta = pReader->pMeta;
// TODO // TODO
return 0; return 0;
} }
const SMetaEntry *metaReaderGetEntry(SMetaReader *pReader) { return &pReader->me; }
#if 1 // =================================================== #if 1 // ===================================================
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
SMTbCursor *pTbCur = NULL; SMTbCursor *pTbCur = NULL;
#if 0
SMetaDB *pDB = pMeta->pDB;
pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur)); pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
if (pTbCur == NULL) { if (pTbCur == NULL) {
return NULL; return NULL;
} }
tdbDbcOpen(pDB->pTbDB, &pTbCur->pDbc); metaReaderInit(&pTbCur->mr, pMeta->pVnode, 0);
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc);
#endif
return pTbCur; return pTbCur;
} }
void metaCloseTbCursor(SMTbCursor *pTbCur) { void metaCloseTbCursor(SMTbCursor *pTbCur) {
#if 0
if (pTbCur) { if (pTbCur) {
TDB_FREE(pTbCur->pKey);
TDB_FREE(pTbCur->pVal);
metaReaderClear(&pTbCur->mr);
if (pTbCur->pDbc) { if (pTbCur->pDbc) {
tdbDbcClose(pTbCur->pDbc); tdbDbcClose(pTbCur->pDbc);
} }
taosMemoryFree(pTbCur); taosMemoryFree(pTbCur);
} }
#endif
} }
char *metaTbCursorNext(SMTbCursor *pTbCur) { int metaTbCursorNext(SMTbCursor *pTbCur) {
#if 0
void *pKey = NULL;
void *pVal = NULL;
int kLen;
int vLen;
int ret; int ret;
void *pBuf; void *pBuf;
STbCfg tbCfg; STbCfg tbCfg;
for (;;) { for (;;) {
ret = tdbDbNext(pTbCur->pDbc, &pKey, &kLen, &pVal, &vLen); ret = tdbDbNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
if (ret < 0) break; if (ret < 0) {
pBuf = pVal; return -1;
metaDecodeTbInfo(pBuf, &tbCfg);
if (tbCfg.type == META_SUPER_TABLE) {
taosMemoryFree(tbCfg.name);
taosMemoryFree(tbCfg.stbCfg.pTagSchema);
continue;
} else if (tbCfg.type == META_CHILD_TABLE) {
kvRowFree(tbCfg.ctbCfg.pTag);
} }
return tbCfg.name; metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == META_SUPER_TABLE) {
continue;
}
} }
#endif return 0;
return NULL;
} }
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {

View File

@ -47,7 +47,7 @@ struct SMetaDB {
#endif #endif
}; };
#pragma pack(push,1) #pragma pack(push, 1)
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int32_t sver; int32_t sver;
@ -406,10 +406,6 @@ static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_
return pSchemaWrapper; return pSchemaWrapper;
} }
struct SMTbCursor {
TDBC *pDbc;
};
struct SMCtbCursor { struct SMCtbCursor {
TDBC *pCur; TDBC *pCur;
tb_uid_t suid; tb_uid_t suid;

View File

@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#ifndef TDENGINE_EXECUTORIMPL_H #ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H
@ -38,6 +39,8 @@ extern "C" {
#include "tmsg.h" #include "tmsg.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
@ -379,7 +382,7 @@ typedef struct SSysTableScanInfo {
int32_t accountId; int32_t accountId;
bool showRewrite; bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void* pCur; // cursor for iterate the local table meta store. SMTbCursor* pCur; // cursor for iterate the local table meta store.
SArray* scanCols; // SArray<int16_t> scan column id list SArray* scanCols; // SArray<int16_t> scan column id list
// int32_t type; // show type, TODO remove it // int32_t type; // show type, TODO remove it

View File

@ -376,8 +376,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
} }
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval,
double sampleRatio, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
@ -390,19 +391,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
return NULL; return NULL;
} }
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio; pInfo->sampleRatio = sampleRatio;
pInfo->dataBlockLoadFlag= dataLoadFlag; pInfo->dataBlockLoadFlag = dataLoadFlag;
pInfo->pResBlock = pResBlock; pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition; pInfo->pFilterNode = pCondition;
pInfo->dataReader = pDataReader; pInfo->dataReader = pDataReader;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->order = order; pInfo->order = order;
pInfo->current = 0; pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo; pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
@ -828,8 +829,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
char n[TSDB_TABLE_NAME_LEN] = {0}; char n[TSDB_TABLE_NAME_LEN] = {0};
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { while (metaTbCursorNext(pInfo->pCur) == 0) {
STR_TO_VARSTR(n, name); STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
colDataAppend(pTableNameCol, numOfRows, n, false); colDataAppend(pTableNameCol, numOfRows, n, false);
numOfRows += 1; numOfRows += 1;
if (numOfRows >= pInfo->capacity) { if (numOfRows >= pInfo->capacity) {

View File

@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST}) ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
executorTest executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(