refact META 7

This commit is contained in:
Hongze Cheng 2022-04-21 14:01:58 +00:00
parent 588ff5e968
commit fcedc3430d
4 changed files with 93 additions and 34 deletions

View File

@ -1506,6 +1506,10 @@ typedef struct SVCreateStbReq {
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
typedef struct SVDropStbReq {
// data
} SVDropStbReq;
typedef struct SVCreateStbRsp { typedef struct SVCreateStbRsp {
int code; int code;
} SVCreateStbRsp; } SVCreateStbRsp;

View File

@ -52,7 +52,8 @@ int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions);
int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
// metaTable ================== // metaTable ==================
int metaCreateSTable(SMeta* pMeta, SVCreateStbReq* pReq, SVCreateStbRsp* pRsp); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
// metaCommit ================== // metaCommit ==================
int metaBegin(SMeta* pMeta); int metaBegin(SMeta* pMeta);

View File

@ -15,15 +15,25 @@
#include "vnodeInt.h" #include "vnodeInt.h"
int metaCreateSTable(SMeta *pMeta, SVCreateStbReq *pReq, SVCreateStbRsp *pRsp) { static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version);
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid);
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SSkmDbKey skmDbKey = {0}; SSkmDbKey skmDbKey = {0};
SMetaEntry me = {0}; SMetaEntry me = {0};
int kLen; int kLen = 0;
int vLen; int vLen = 0;
const void *pKey; const void *pKey = NULL;
const void *pVal; const void *pVal = NULL;
void *pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
SCoder coder = {0};
// check name and uid unique {
// TODO: validate request (uid and name unique)
}
// set structs // set structs
me.type = TSDB_SUPER_TABLE; me.type = TSDB_SUPER_TABLE;
@ -38,33 +48,27 @@ int metaCreateSTable(SMeta *pMeta, SVCreateStbReq *pReq, SVCreateStbRsp *pRsp) {
skmDbKey.uid = pReq->suid; skmDbKey.uid = pReq->suid;
skmDbKey.sver = 0; // (TODO) skmDbKey.sver = 0; // (TODO)
// save to table.db (TODO) // save to table.db
pKey = NULL; if (metaSaveToTbDb(pMeta, version, &me) < 0) goto _err;
kLen = 0;
pVal = NULL;
vLen = 0;
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
// save to schema.db // update uid idx
pKey = &skmDbKey; if (metaUpdateUidIdx(pMeta, me.uid, version) < 0) goto _err;
kLen = sizeof(skmDbKey);
pVal = NULL;
vLen = 0;
if (tdbDbInsert(pMeta->pSkmDb, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
// update name.idx // update name.idx
pKey = pReq->name; if (metaUpdateNameIdx(pMeta, me.name, me.uid) < 0) goto _err;
kLen = strlen(pReq->name) + 1;
pVal = &pReq->suid;
vLen = sizeof(tb_uid_t);
if (tdbDbInsert(pMeta->pNameIdx, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
metaDebug("vgId: %d super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
return 0;
_err:
metaError("vgId: %d failed to create super table: %s uid: %" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->suid, tstrerror(terrno));
return -1;
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
// TODO
return 0; return 0;
} }
@ -99,3 +103,53 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
return 0; return 0;
} }
static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME) {
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SCoder coder = {0};
// set key and value
pKey = &version;
kLen = sizeof(version);
if (tEncodeSize(metaEncodeEntry, pME, vLen) < 0) {
goto _err;
}
pVal = taosMemoryMalloc(vLen);
if (pVal == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
if (metaEncodeEntry(&coder, pME) < 0) {
goto _err;
}
tCoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, NULL) < 0) {
goto _err;
}
taosMemoryFree(pVal);
return 0;
_err:
taosMemoryFree(pVal);
return -1;
}
static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version) {
return tdbDbInsert(pMeta->pUidIdx, &uid, sizeof(uid), &version, sizeof(version), NULL);
}
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid) {
return tdbDbInsert(pMeta->pNameIdx, name, strlen(name) + 1, &uid, sizeof(uid), NULL);
}

View File

@ -15,7 +15,7 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
@ -68,7 +68,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
switch (pMsg->msgType) { switch (pMsg->msgType) {
/* META */ /* META */
case TDMT_VND_CREATE_STB: case TDMT_VND_CREATE_STB:
if (vnodeProcessCreateStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_ALTER_STB: case TDMT_VND_ALTER_STB:
if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
@ -214,7 +214,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return 0; return 0;
} }
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len, SRpcMsg *pRsp) { static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SCoder coder; SCoder coder;
@ -231,7 +231,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len, SRpcMsg
goto _err; goto _err;
} }
if (metaCreateSTable(pVnode->pMeta, pReq, NULL) < 0) { if (metaCreateSTable(pVnode->pMeta, version, pReq) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
} }