refact META 7

This commit is contained in:
Hongze Cheng 2022-04-22 05:30:15 +00:00
parent 44c5e42ff2
commit 0f3e49d8f7
5 changed files with 99 additions and 7 deletions

View File

@ -45,16 +45,25 @@ typedef struct SMetaEntry SMetaEntry;
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME); int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
// metaTable ==================
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
// metaQuery ==================
typedef struct SMetaEntryReader SMetaEntryReader;
void metaEntryReaderInit(SMetaEntryReader* pReader);
void metaEntryReaderClear(SMetaEntryReader* pReader);
int metaGetTableEntryByVersion(SMeta* pMeta, SMetaEntryReader* pReader, int64_t version);
int metaGetTableEntryByUid(SMeta* pMeta, SMetaEntryReader* pReader, tb_uid_t uid);
int metaGetTableEntryByName(SMeta* pMeta, SMetaEntryReader* pReader, const char* name);
// metaIdx ================== // metaIdx ==================
int metaOpenIdx(SMeta* pMeta); int metaOpenIdx(SMeta* pMeta);
void metaCloseIdx(SMeta* pMeta); void metaCloseIdx(SMeta* pMeta);
int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions); int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions);
int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaTable ==================
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
// metaCommit ================== // metaCommit ==================
int metaBegin(SMeta* pMeta); int metaBegin(SMeta* pMeta);
@ -148,6 +157,13 @@ struct SMetaEntry {
}; };
}; };
struct SMetaEntryReader {
SCoder coder;
SMetaEntry me;
void* pBuf;
int szBuf;
};
#ifndef META_REFACT #ifndef META_REFACT
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);

View File

@ -66,7 +66,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
} }
// open pUidIdx // open pUidIdx
ret = tdbDbOpen("uid.db", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId: %d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;

View File

@ -15,6 +15,57 @@
#include "vnodeInt.h" #include "vnodeInt.h"
void metaEntryReaderInit(SMetaEntryReader *pReader) { memset(pReader, 0, sizeof(*pReader)); }
void metaEntryReaderClear(SMetaEntryReader *pReader) {
tCoderClear(&pReader->coder);
TDB_FREE(pReader->pBuf);
}
int metaGetTableEntryByVersion(SMeta *pMeta, SMetaEntryReader *pReader, int64_t version) {
// query table.db
if (tdbDbGet(pMeta->pTbDb, &version, sizeof(version), &pReader->pBuf, &pReader->szBuf) < 0) {
goto _err;
}
// decode the entry
tCoderInit(&pReader->coder, TD_LITTLE_ENDIAN, pReader->pBuf, pReader->szBuf, TD_DECODER);
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err;
}
return 0;
_err:
return -1;
}
int metaGetTableEntryByUid(SMeta *pMeta, SMetaEntryReader *pReader, tb_uid_t uid) {
int64_t version;
// query uid.idx
if (tdbDbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
return -1;
}
version = *(int64_t *)pReader->pBuf;
return metaGetTableEntryByVersion(pMeta, pReader, version);
}
int metaGetTableEntryByName(SMeta *pMeta, SMetaEntryReader *pReader, const char *name) {
tb_uid_t uid;
// query name.idx
if (tdbDbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
return -1;
}
uid = *(tb_uid_t *)pReader->pBuf;
return metaGetTableEntryByUid(pMeta, pReader, uid);
}
#if 1
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
SMTbCursor *pTbCur = NULL; SMTbCursor *pTbCur = NULL;
#if 0 #if 0
@ -392,3 +443,5 @@ void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) {
#endif #endif
return NULL; return NULL;
} }
#endif

View File

@ -22,6 +22,28 @@ int vnodeQueryOpen(SVnode *pVnode) {
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
STableInfoReq infoReq = {0};
SMetaEntryReader meReader = {0};
int32_t code = 0;
// decode req
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
// query meta
metaEntryReaderInit(&meReader);
if (metaGetTableEntryByName(pVnode->pMeta, &meReader, NULL) < 0) {
goto _exit;
}
// fill response
_exit:
return 0;
#if 0
STbCfg *pTbCfg = NULL; STbCfg *pTbCfg = NULL;
STbCfg *pStbCfg = NULL; STbCfg *pStbCfg = NULL;
tb_uid_t uid; tb_uid_t uid;
@ -147,6 +169,7 @@ _exit:
rpcMsg.code = code; rpcMsg.code = code;
tmsgSendRsp(&rpcMsg); tmsgSendRsp(&rpcMsg);
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -231,7 +231,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err; goto _err;
} }
if (metaCreateSTable(pVnode->pMeta, version, pReq) < 0) { if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
} }