refact more meta
This commit is contained in:
parent
44dc05f38f
commit
ad9fd5ec26
|
@ -2193,8 +2193,8 @@ static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
||||||
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
||||||
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
|
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
|
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
|
||||||
if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1;
|
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -2202,8 +2202,8 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
|
||||||
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
|
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
|
||||||
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
|
||||||
if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1;
|
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
|
||||||
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,7 @@ void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
|
||||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||||
|
|
||||||
struct SMetaEntry {
|
struct SMetaEntry {
|
||||||
|
int64_t version;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
const char* name;
|
const char* name;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
|
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI64(pCoder, pME->version) < 0) return -1;
|
||||||
if (tEncodeI8(pCoder, pME->type) < 0) return -1;
|
if (tEncodeI8(pCoder, pME->type) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pME->uid) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->uid) < 0) return -1;
|
||||||
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
||||||
|
@ -45,6 +46,7 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
|
||||||
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
|
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pME->type) < 0) return -1;
|
if (tDecodeI8(pCoder, &pME->type) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pME->uid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->uid) < 0) return -1;
|
||||||
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
||||||
|
|
|
@ -15,13 +15,14 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME);
|
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version);
|
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid);
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME);
|
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME);
|
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid);
|
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaSaveToSkmDb(SMeta *pMeta, tb_uid_t uid, SSchemaWrapper *pSW);
|
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
|
||||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
|
@ -39,23 +40,14 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set structs
|
// set structs
|
||||||
|
me.version = version;
|
||||||
me.type = TSDB_SUPER_TABLE;
|
me.type = TSDB_SUPER_TABLE;
|
||||||
me.uid = pReq->suid;
|
me.uid = pReq->suid;
|
||||||
me.name = pReq->name;
|
me.name = pReq->name;
|
||||||
me.stbEntry.schema = pReq->schema;
|
me.stbEntry.schema = pReq->schema;
|
||||||
me.stbEntry.schemaTag = pReq->schemaTag;
|
me.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
|
||||||
// save to table.db
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
if (metaSaveToTbDb(pMeta, version, &me) < 0) goto _err;
|
|
||||||
|
|
||||||
// save to schema.db (TODO)
|
|
||||||
if (metaSaveToSkmDb(pMeta, pReq->suid, &pReq->schema) < 0) goto _err;
|
|
||||||
|
|
||||||
// update uid idx
|
|
||||||
if (metaUpdateUidIdx(pMeta, me.uid, version) < 0) goto _err;
|
|
||||||
|
|
||||||
// update name.idx
|
|
||||||
if (metaUpdateNameIdx(pMeta, me.name, me.uid) < 0) goto _err;
|
|
||||||
|
|
||||||
metaDebug("vgId: %d super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
metaDebug("vgId: %d super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
||||||
|
|
||||||
|
@ -105,16 +97,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
me.ntbEntry.schema = pReq->ntb.schema;
|
me.ntbEntry.schema = pReq->ntb.schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
// save table
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
if (me.type == TSDB_CHILD_TABLE) {
|
|
||||||
if (metaCreateChildTable(pMeta, version, &me) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (metaCreateNormalTable(pMeta, version, &me) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
metaDebug("vgId:%d table %s uid %" PRId64 " is created", TD_VID(pMeta->pVnode), pReq->name, pReq->uid);
|
metaDebug("vgId:%d table %s uid %" PRId64 " is created", TD_VID(pMeta->pVnode), pReq->name, pReq->uid);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -141,7 +124,7 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME) {
|
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
STbDbKey tbDbKey;
|
STbDbKey tbDbKey;
|
||||||
void *pKey = NULL;
|
void *pKey = NULL;
|
||||||
void *pVal = NULL;
|
void *pVal = NULL;
|
||||||
|
@ -150,7 +133,7 @@ static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME)
|
||||||
SCoder coder = {0};
|
SCoder coder = {0};
|
||||||
|
|
||||||
// set key and value
|
// set key and value
|
||||||
tbDbKey.version = version;
|
tbDbKey.version = pME->version;
|
||||||
tbDbKey.uid = pME->uid;
|
tbDbKey.uid = pME->uid;
|
||||||
|
|
||||||
pKey = &tbDbKey;
|
pKey = &tbDbKey;
|
||||||
|
@ -187,71 +170,113 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version) {
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbDbInsert(pMeta->pUidIdx, &uid, sizeof(uid), &version, sizeof(version), NULL);
|
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid) {
|
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbDbInsert(pMeta->pNameIdx, name, strlen(name) + 1, &uid, sizeof(uid), NULL);
|
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid) {
|
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
STtlIdxKey ttlKey = {.dtime = dtime, .uid = uid};
|
int32_t ttlDays;
|
||||||
|
int64_t ctime;
|
||||||
|
STtlIdxKey ttlKey;
|
||||||
|
|
||||||
|
if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
|
ctime = pME->ctbEntry.ctime;
|
||||||
|
ttlDays = pME->ctbEntry.ttlDays;
|
||||||
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
|
ctime = pME->ntbEntry.ctime;
|
||||||
|
ttlDays = pME->ntbEntry.ttlDays;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ttlDays <= 0) return 0;
|
||||||
|
|
||||||
|
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
|
||||||
|
ttlKey.uid = pME->uid;
|
||||||
|
|
||||||
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, NULL);
|
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) {
|
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
|
||||||
|
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) {
|
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
int64_t dtime;
|
SCoder coder = {0};
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen = 0;
|
||||||
|
int rcode = 0;
|
||||||
|
SSkmDbKey skmDbKey = {0};
|
||||||
|
const SSchemaWrapper *pSW;
|
||||||
|
|
||||||
// save to table.db
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
if (metaSaveToTbDb(pMeta, version, pME) < 0) return -1;
|
pSW = &pME->stbEntry.schema;
|
||||||
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
// save to schema.db
|
pSW = &pME->ntbEntry.schema;
|
||||||
if (metaSaveToSkmDb(pMeta, pME->uid, &pME->ntbEntry.schema) < 0) return -1;
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
// update uid.idx
|
|
||||||
if (metaUpdateUidIdx(pMeta, pME->uid, version) < 0) return -1;
|
|
||||||
|
|
||||||
// save to name.idx
|
|
||||||
if (metaUpdateNameIdx(pMeta, pME->name, pME->uid) < 0) return -1;
|
|
||||||
|
|
||||||
// save to pTtlIdx if need
|
|
||||||
if (pME->ntbEntry.ttlDays > 0) {
|
|
||||||
dtime = pME->ntbEntry.ctime + pME->ntbEntry.ttlDays * 24 * 60;
|
|
||||||
|
|
||||||
if (metaUpdateTtlIdx(pMeta, dtime, pME->uid) < 0) return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
skmDbKey.uid = pME->uid;
|
||||||
}
|
skmDbKey.sver = pSW->sver;
|
||||||
|
|
||||||
static int metaSaveToSkmDb(SMeta *pMeta, tb_uid_t uid, SSchemaWrapper *pSW) {
|
|
||||||
SCoder coder = {0};
|
|
||||||
void *pVal = NULL;
|
|
||||||
int vLen = 0;
|
|
||||||
int rcode = 0;
|
|
||||||
SSkmDbKey skmDbKey = {.uid = uid, .sver = pSW->sver};
|
|
||||||
|
|
||||||
// encode schema
|
// encode schema
|
||||||
if (tEncodeSize(tEncodeSSchemaWrapper, pSW, vLen) < 0) return -1;
|
if (tEncodeSize(tEncodeSSchemaWrapper, pSW, vLen) < 0) return -1;
|
||||||
pVal = TCODER_MALLOC(&coder, vLen);
|
pVal = taosMemoryMalloc(vLen);
|
||||||
if (pVal == NULL) {
|
if (pVal == NULL) {
|
||||||
rcode = -1;
|
rcode = -1;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
|
||||||
|
tEncodeSSchemaWrapper(&coder, pSW);
|
||||||
|
|
||||||
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, NULL) < 0) {
|
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, NULL) < 0) {
|
||||||
rcode = -1;
|
rcode = -1;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
taosMemoryFree(pVal);
|
||||||
tCoderClear(&coder);
|
tCoderClear(&coder);
|
||||||
|
return rcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
// save to table.db
|
||||||
|
if (metaSaveToTbDb(pMeta, pME) < 0) return -1;
|
||||||
|
|
||||||
|
// update uid.idx
|
||||||
|
if (metaUpdateUidIdx(pMeta, pME) < 0) return -1;
|
||||||
|
|
||||||
|
// update name.idx
|
||||||
|
if (metaUpdateNameIdx(pMeta, pME) < 0) return -1;
|
||||||
|
|
||||||
|
if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
|
// update ctb.idx
|
||||||
|
if (metaUpdateCtbIdx(pMeta, pME) < 0) return -1;
|
||||||
|
|
||||||
|
// update tag.idx
|
||||||
|
if (metaUpdateTagIdx(pMeta, pME) < 0) return -1;
|
||||||
|
} else {
|
||||||
|
// update schema.db
|
||||||
|
if (metaSaveToSkmDb(pMeta, pME) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pME->type != TSDB_SUPER_TABLE) {
|
||||||
|
if (metaUpdateTtlIdx(pMeta, pME) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue