refact: create super table code

This commit is contained in:
Hongze Cheng 2024-12-04 17:39:28 +08:00
parent b2eded387f
commit 3e06712493
5 changed files with 417 additions and 3551 deletions

View File

@ -156,6 +156,7 @@ int metaFinishCommit(SMeta* pMeta, TXN* txn);
int metaPrepareAsyncCommit(SMeta* pMeta);
int metaAbort(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaCreateSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);

View File

@ -10,15 +10,242 @@
#include "meta.h"
#define metaErrLog(ERRNO, INFO) \
#define metaErr(VGID, ERRNO) \
do { \
if (INFO) { \
metaError("%s failed at %s:%d since %s, info:%s", __func__, __FILE__, __LINE__, tstrerror(ERRNO), INFO); \
} else { \
metaError("%s failed at %s:%d since %s", __func__, __FILE__, __LINE__, tstrerror(ERRNO)); \
} \
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", VGID, \
__func__, __FILE__, __LINE__, tstrerror(ERRNO), pEntry->version, pEntry->type, pEntry->uid, \
pEntry->type > 0 ? pEntry->name : NULL); \
} while (0)
// Entry Table
static int32_t metaEntryTableInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
void *value = NULL;
int32_t valueSize = 0;
SEncoder encoder = {0};
STbDbKey key = {
.version = pEntry->version,
.uid = pEntry->uid,
};
// encode entry
tEncodeSize(metaEncodeEntry, pEntry, valueSize, code);
if (code != 0) {
metaErr(vgId, code);
return code;
}
value = taosMemoryMalloc(valueSize);
if (NULL == value) {
metaErr(vgId, terrno);
return terrno;
}
tEncoderInit(&encoder, value, valueSize);
code = metaEncodeEntry(&encoder, pEntry);
if (code) {
metaErr(vgId, code);
tEncoderClear(&encoder);
taosMemoryFree(value);
return code;
}
tEncoderClear(&encoder);
// put to tdb
code = tdbTbInsert(pMeta->pTbDb, &key, sizeof(key), value, valueSize, pMeta->txn);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
}
taosMemoryFree(value);
return code;
}
static int32_t metaEntryTableDrop(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
// TODO
return code;
}
// Schema Table
static int32_t metaSchemaTableInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
SEncoder encoder = {0};
void *value = NULL;
int32_t valueSize = 0;
const SSchemaWrapper *pSchema = NULL;
if (pEntry->type == TSDB_SUPER_TABLE) {
pSchema = &pEntry->stbEntry.schemaRow;
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
pSchema = &pEntry->ntbEntry.schemaRow;
} else {
return TSDB_CODE_INVALID_PARA;
}
SSkmDbKey key = {
.uid = pEntry->uid,
.sver = pSchema->version,
};
// encode schema
tEncodeSize(tEncodeSSchemaWrapper, pSchema, valueSize, code);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
tEncoderInit(&encoder, value, valueSize);
code = tEncodeSSchemaWrapper(&encoder, pSchema);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
tEncoderClear(&encoder);
taosMemoryFree(value);
return code;
}
tEncoderClear(&encoder);
// put to tdb
code = tdbTbInsert(pMeta->pSkmDb, &key, sizeof(key), value, valueSize, pMeta->txn);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
}
taosMemoryFree(value);
return code;
}
// Uid Index
static void metaBuildEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
pInfo->uid = pEntry->uid;
pInfo->version = pEntry->version;
if (pEntry->type == TSDB_SUPER_TABLE) {
pInfo->suid = pEntry->uid;
pInfo->skmVer = pEntry->stbEntry.schemaRow.version;
} else if (pEntry->type == TSDB_CHILD_TABLE) {
pInfo->suid = pEntry->ctbEntry.suid;
pInfo->skmVer = 0;
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
pInfo->suid = 0;
pInfo->skmVer = pEntry->ntbEntry.schemaRow.version;
}
}
static int32_t metaUidIdxUpsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
// update cache
SMetaInfo info = {0};
metaBuildEntryInfo(pEntry, &info);
code = metaCacheUpsert(pMeta, &info);
if (code) {
metaErr(vgId, code);
}
// put to tdb
SUidIdxVal value = {
.suid = 0,
.skmVer = 0,
.version = pEntry->version,
};
code = tdbTbUpsert(pMeta->pUidIdx, &pEntry->uid, sizeof(pEntry->uid), &value, sizeof(value), pMeta->txn);
return code;
}
// Name Index
static int32_t metaNameIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = tdbTbInsert(pMeta->pNameIdx, pEntry->name, strlen(pEntry->name) + 1, &pEntry->uid, sizeof(pEntry->uid),
pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
}
static int32_t metaNameIdxDelete(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = tdbTbDelete(pMeta->pNameIdx, pEntry->name, strlen(pEntry->name) + 1, pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
}
// Suid Index
static int32_t metaSUidIdxInsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = tdbTbInsert(pMeta->pSuidIdx, &pEntry->uid, sizeof(pEntry->uid), NULL, 0, pMeta->txn);
if (code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
}
// Tag Index
static int32_t metaHandleSuperTableCreateImpl(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
// Insert into Entry Table
code = metaEntryTableInsert(pMeta, pEntry);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
// Insert into Schema Table
code = metaSchemaTableInsert(pMeta, pEntry);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
// Insert to Uid Index
code = metaUidIdxUpsert(pMeta, pEntry);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
// Insert to Name Index
code = metaNameIdxInsert(pMeta, pEntry);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
// Insert to Suid Index
code = metaSUidIdxInsert(pMeta, pEntry);
if (TSDB_CODE_SUCCESS != code) {
metaErr(vgId, code);
return code;
}
return code;
}
static int32_t metaHandleSuperTableCreate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
metaWLock(pMeta);
code = metaHandleSuperTableCreateImpl(pMeta, pEntry);
metaULock(pMeta);
if (TSDB_CODE_SUCCESS == code) {
pMeta->pVnode->config.vndStats.numOfSTables++;
pMeta->changed = true;
metaInfo("vgId:%d, %s success, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__,
pEntry->version, pEntry->type, pEntry->uid, pEntry->name);
} else {
metaError("vgId:%d, %s failed at %s:%d since %s, version" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__,
__FILE__, __LINE__, tstrerror(code), pEntry->version, pEntry->type, pEntry->uid, pEntry->name);
}
return code;
}
static int32_t metaHandleNormalTableDrop(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
@ -37,117 +264,74 @@ static int32_t metaHandleSuperTableDrop(SMeta *pMeta, const SMetaEntry *pEntry)
return code;
}
static int32_t metaHandleEntryDrop(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int8_t type = -pEntry->type;
if (type == TSDB_NORMAL_TABLE) {
code = metaHandleNormalTableDrop(pMeta, pEntry);
} else if (type == TSDB_CHILD_TABLE) {
code = metaHandleChildTableDrop(pMeta, pEntry);
} else if (type == TSDB_SUPER_TABLE) {
code = metaHandleSuperTableDrop(pMeta, pEntry);
} else {
code = TSDB_CODE_INVALID_PARA;
metaErrLog(code, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
} else {
}
return code;
}
static int32_t metaHandleNormalTableEntryUpdate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO:
return code;
}
static int32_t metaHandleNormalTableEntryCreateImpl(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
// code = metaSaveToTbDb(pMeta, pEntry);
// code = metaUpdateUidIdx(pMeta, pEntry);
// code = metaUpdatenameIdx(pMeta, pEntry);
// code = metaSaveToSkmDb(pMeta, pEntry);
// code = metaUpdateBtimeIdx(pMeta, pEntry);
// code = metaUpdateNcolIdx(pMeta, pEntry);
// code = metaUpdateTtl(pMeta, pEntry);
return code;
}
static int32_t metaHandleNormalTableEntryCreate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO: do neccessary check
// do handle entry
metaWLock(pMeta);
code = metaHandleNormalTableEntryCreateImpl(pMeta, pEntry);
metaULock(pMeta);
return code;
}
static int32_t metaHandleNormalTableEntryUpsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
SMetaInfo metaInfo = {0};
if (TSDB_CODE_SUCCESS == metaGetInfo(pMeta, pEntry->uid, &metaInfo, NULL)) {
code = metaHandleNormalTableEntryUpdate(pMeta, pEntry);
} else {
code = metaHandleNormalTableEntryCreate(pMeta, pEntry);
}
return code;
}
static int32_t metaHandleSuperTableEntryUpsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
return code;
}
static int32_t metaHandleChildTableEntryUpsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
return code;
}
static int32_t metaHandleEntryUpsert(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
if (pEntry->type == TSDB_NORMAL_TABLE) {
code = metaHandleNormalTableEntryUpsert(pMeta, pEntry);
} else if (pEntry->type == TSDB_SUPER_TABLE) {
code = metaHandleSuperTableEntryUpsert(pMeta, pEntry);
} else if (pEntry->type == TSDB_CHILD_TABLE) {
code = metaHandleChildTableEntryUpsert(pMeta, pEntry);
} else {
code = TSDB_CODE_INVALID_PARA;
metaErrLog(code, NULL);
}
return code;
}
int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
SMetaInfo info = {0};
int8_t type = pEntry->type > 0 ? pEntry->type : -pEntry->type;
if (pMeta == NULL || pEntry == NULL) {
metaErrLog(TSDB_CODE_INVALID_PARA, NULL);
if (NULL == pMeta || NULL == pEntry || type != TSDB_SUPER_TABLE || type != TSDB_CHILD_TABLE ||
type != TSDB_NORMAL_TABLE) {
metaError("%s failed at %s:%d since invalid parameter", __func__, __FILE__, __LINE__);
return TSDB_CODE_INVALID_PARA;
}
if (pEntry->type > 0) {
code = metaHandleEntryUpsert(pMeta, pEntry);
} else {
code = metaHandleEntryDrop(pMeta, pEntry);
bool isExist = false;
if (TSDB_CODE_SUCCESS == metaGetInfo(pMeta, pEntry->uid, &info, NULL)) {
isExist = true;
}
return code;
switch (type) {
case TSDB_SUPER_TABLE:
if (isExist) {
// code = metaHandleSuperTableUpdate(pMeta, pEntry);
} else {
code = metaHandleSuperTableCreate(pMeta, pEntry);
}
break;
case TSDB_CHILD_TABLE:
if (isExist) {
// code = metaHandleChildTableUpdate(pMeta, pEntry);
} else {
// code = metaHandleChildTableCreate(pMeta, pEntry);
}
break;
case TSDB_NORMAL_TABLE:
if (isExist) {
// code = metaHandleNormalTableUpdate(pMeta, pEntry);
} else {
// code = metaHandleNormalTableCreate(pMeta, pEntry);
}
break;
default:
code = TSDB_CODE_INVALID_PARA;
break;
}
} else {
switch (type) {
case TSDB_SUPER_TABLE:
// code = metaHandleSuperTableDrop(pMeta, pEntry);
break;
case TSDB_CHILD_TABLE:
// code = metaHandleChildTableDrop(pMeta, pEntry);
break;
case TSDB_NORMAL_TABLE:
// code = metaHandleNormalTableDrop(pMeta, pEntry);
break;
default:
code = TSDB_CODE_INVALID_PARA;
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
metaDebug("vgId:%d, %s success, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__,
pEntry->version, pEntry->type, pEntry->uid, pEntry->type > 0 ? pEntry->name : "");
} else {
metaError("vgId:%d, %s failed at %s:%d since %s, version" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__,
__FILE__, __LINE__, tstrerror(code), pEntry->version, pEntry->type, pEntry->uid,
pEntry->type > 0 ? pEntry->name : "");
}
TAOS_RETURN(code);
}

File diff suppressed because it is too large Load Diff

View File

@ -1145,7 +1145,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto _err;
}
code = metaCreateSTable(pVnode->pMeta, ver, &req);
code = metaCreateSuperTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
goto _err;
@ -2579,4 +2579,3 @@ _OVER:
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
#endif