more code

This commit is contained in:
Hongze Cheng 2024-12-06 16:20:53 +08:00
parent c9abce4986
commit 76d36ab9fb
6 changed files with 222 additions and 127 deletions

View File

@ -160,6 +160,7 @@ int metaCreateSuperTable(SMeta* pMeta, int64_t version, SVCreateStbR
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta);
int32_t metaDropTables(SMeta* pMeta, SArray* tbUids);
@ -218,7 +219,7 @@ int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, const SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);

View File

@ -385,11 +385,12 @@ static int32_t metaHandleNormalTableCreateImpl(SMeta *pMeta, const SMetaEntry *p
SMetaTableOp ops[] = {
{META_ENTRY_TABLE, META_TABLE_OP_INSERT}, //
{META_SCHEMA_TABLE, META_TABLE_OP_INSERT}, //
{META_SCHEMA_TABLE, META_TABLE_OP_UPDATA}, // TODO: need to be insert
{META_UID_IDX, META_TABLE_OP_INSERT}, //
{META_NAME_IDX, META_TABLE_OP_INSERT}, //
{META_BTIME_IDX, META_TABLE_OP_INSERT}, //
{META_TTL_IDX, META_TABLE_OP_INSERT}, //
// TODO: need ncol idx
};
for (int i = 0; i < sizeof(ops) / sizeof(ops[0]); i++) {
@ -407,14 +408,25 @@ static int32_t metaHandleNormalTableCreateImpl(SMeta *pMeta, const SMetaEntry *p
static int32_t metaHandleNormalTableCreate(SMeta *pMeta, const SMetaEntry *pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
// update TDB
metaWLock(pMeta);
code = metaHandleNormalTableCreateImpl(pMeta, pEntry);
metaULock(pMeta);
{
// TODO: other stuff
// update other stuff
if (TSDB_CODE_SUCCESS != code) {
pMeta->pVnode->config.vndStats.numOfNTables++;
pMeta->pVnode->config.vndStats.numOfNTimeSeries += pEntry->ntbEntry.schemaRow.nCols - 1;
pMeta->changed = true;
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int32_t rc = tsdbCacheNewTable(pMeta->pVnode->pTsdb, pEntry->uid, -1, &pEntry->ntbEntry.schemaRow);
if (rc < 0) {
metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pEntry->name, tstrerror(rc));
}
}
} else {
metaErr(TD_VID(pMeta->pVnode), code);
}
return code;
}

View File

@ -87,7 +87,7 @@ static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
}
}
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
if (NULL == pMetaRsp->pSchemas) {
return terrno;
@ -437,7 +437,12 @@ _drop_super_table:
tstrerror(terrno));
}
metaCacheDrop(pMeta, pReq->suid);
ret = metaCacheDrop(pMeta, pReq->suid);
if (ret < 0) {
metaError("vgId:%d, failed to drop stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid,
tstrerror(terrno));
}
ret = tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
if (ret < 0) {
metaError("vgId:%d, failed to drop stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid,

View File

@ -16,6 +16,7 @@
#include "meta.h"
extern int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry);
extern int32_t metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp);
static int32_t metaCheckCreateSuperTableReq(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int32_t vgId = TD_VID(pMeta->pVnode);
@ -129,6 +130,12 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
SMetaReader mr = {0};
int32_t ret;
// validate message
if (pReq->type != TSDB_CHILD_TABLE && pReq->type != TSDB_NORMAL_TABLE) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
}
if (pReq->type == TSDB_CHILD_TABLE) {
tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.stbName);
if (suid != pReq->ctb.suid) {
@ -160,10 +167,11 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
// build SMetaEntry
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
me.version = version;
me.version = ver;
me.type = pReq->type;
me.uid = pReq->uid;
me.name = pReq->name;
if (me.type == TSDB_CHILD_TABLE) {
me.ctbEntry.btime = pReq->btime;
me.ctbEntry.ttlDays = pReq->ttl;
me.ctbEntry.commentLen = pReq->commentLen;
@ -171,6 +179,27 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
me.ctbEntry.suid = pReq->ctb.suid;
me.ctbEntry.pTags = pReq->ctb.pTag;
#ifdef TAG_FILTER_DEBUG
SArray *pTagVals = NULL;
int32_t code = tTagToValArray((STag *)pReq->ctb.pTag, &pTagVals);
for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
memcpy(buf, pTagVal->pData, pTagVal->nData);
metaDebug("metaTag table:%s varchar index:%d cid:%d type:%d value:%s", pReq->name, i, pTagVal->cid,
pTagVal->type, buf);
taosMemoryFree(buf);
} else {
double val = 0;
GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
metaDebug("metaTag table:%s number index:%d cid:%d type:%d value:%f", pReq->name, i, pTagVal->cid,
pTagVal->type, val);
}
}
#endif
++pStats->numOfCTables;
if (!sysTbl) {
@ -204,6 +233,27 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
goto _err;
}
}
} else {
me.ntbEntry.btime = pReq->btime;
me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.commentLen = pReq->commentLen;
me.ntbEntry.comment = pReq->comment;
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
me.colCmpr = pReq->colCmpr;
TABLE_SET_COL_COMPRESSED(me.flags);
++pStats->numOfNTables;
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
ret = tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow);
if (ret < 0) {
metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pReq->name, tstrerror(ret));
goto _err;
}
}
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
@ -213,10 +263,23 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (*pMetaRsp) {
if (me.type == TSDB_CHILD_TABLE) {
(*pMetaRsp)->tableType = TSDB_CHILD_TABLE;
(*pMetaRsp)->tuid = pReq->uid;
(*pMetaRsp)->suid = pReq->ctb.suid;
strcpy((*pMetaRsp)->tbName, pReq->name);
} else {
ret = metaUpdateMetaRsp(pReq->uid, pReq->name, &pReq->ntb.schemaRow, *pMetaRsp);
if (ret < 0) {
metaError("vgId:%d, failed to update meta rsp:%s since %s", TD_VID(pMeta->pVnode), pReq->name,
tstrerror(ret));
}
for (int32_t i = 0; i < pReq->colCmpr.nCols; i++) {
SColCmpr *p = &pReq->colCmpr.pColCmpr[i];
(*pMetaRsp)->pSchemaExt[i].colId = p->id;
(*pMetaRsp)->pSchemaExt[i].compress = p->alg;
}
}
}
}
@ -237,93 +300,106 @@ _err:
// Alter Child Table
// Create Normal Table
static int32_t metaCheckCreateNormalTableReq(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
int32_t code = 0;
void *value = NULL;
int32_t valueSize = 0;
if (NULL == pReq->name || strlen(pReq->name) == 0) {
metaError("vgId:%d, %s failed at %s:%d since invalid name:%s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, pReq->name, version);
return TSDB_CODE_INVALID_MSG;
}
// check name
if (tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &value, &valueSize) == 0) {
// for auto create table, we return the uid of the existing table
pReq->uid = *(tb_uid_t *)value;
tdbFree(value);
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
}
// grant check
code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " name:%s", TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, tstrerror(code), version, pReq->name);
}
return code;
}
static int32_t metaBuildCreateNormalTableRsp(SMeta *pMeta, SMetaEntry *pEntry, STableMetaRsp **ppRsp) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == ppRsp) {
return code;
}
*ppRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == *ppRsp) {
return terrno;
}
code = metaUpdateMetaRsp(pEntry->uid, pEntry->name, &pEntry->ntbEntry.schemaRow, *ppRsp);
if (code) {
taosMemoryFreeClear(*ppRsp);
return code;
}
for (int32_t i = 0; i < pEntry->colCmpr.nCols; i++) {
SColCmpr *p = &pEntry->colCmpr.pColCmpr[i];
(*ppRsp)->pSchemaExt[i].colId = p->id;
(*ppRsp)->pSchemaExt[i].compress = p->alg;
}
return code;
}
static int32_t metaCreateNormalTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **ppRsp) {
int32_t code = TSDB_CODE_SUCCESS;
#if 0
SMetaEntry me = {0};
SMetaReader mr = {0};
int32_t ret;
// validate req
metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
metaReaderClear(&mr);
return terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
}
pReq->uid = mr.me.uid;
if (pReq->type == TSDB_CHILD_TABLE) {
pReq->ctb.suid = mr.me.ctbEntry.suid;
}
metaReaderClear(&mr);
return terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
} else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
terrno = TSDB_CODE_SUCCESS;
}
metaReaderClear(&mr);
bool sysTbl = (pReq->type == TSDB_CHILD_TABLE) && metaTbInFilterCache(pMeta, pReq->ctb.stbName, 1);
if (!sysTbl && ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0)) goto _err;
// build SMetaEntry
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
me.version = version;
me.type = pReq->type;
me.uid = pReq->uid;
me.name = pReq->name;
me.ntbEntry.btime = pReq->btime;
me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.commentLen = pReq->commentLen;
me.ntbEntry.comment = pReq->comment;
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
me.colCmpr = pReq->colCmpr;
TABLE_SET_COL_COMPRESSED(me.flags);
++pStats->numOfNTables;
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
ret = tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow);
if (ret < 0) {
metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pReq->name, tstrerror(ret));
goto _err;
// check request
code = metaCheckCreateNormalTableReq(pMeta, version, pReq);
if (code) {
if (TSDB_CODE_TDB_TABLE_ALREADY_EXIST != code) {
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64 " name:%s", TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, tstrerror(code), version, pReq->name);
}
TAOS_RETURN(code);
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
// handle entry
SMetaEntry entry = {
.version = version,
.type = TSDB_NORMAL_TABLE,
.uid = pReq->uid,
.name = pReq->name,
.ntbEntry.btime = pReq->btime,
.ntbEntry.ttlDays = pReq->ttl,
.ntbEntry.commentLen = pReq->commentLen,
.ntbEntry.comment = pReq->comment,
.ntbEntry.schemaRow = pReq->ntb.schemaRow,
.ntbEntry.ncid = pReq->ntb.schemaRow.pSchema[pReq->ntb.schemaRow.nCols - 1].colId + 1,
.colCmpr = pReq->colCmpr,
};
TABLE_SET_COL_COMPRESSED(entry.flags);
metaTimeSeriesNotifyCheck(pMeta);
if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (*pMetaRsp) {
ret = metaUpdateMetaRsp(pReq->uid, pReq->name, &pReq->ntb.schemaRow, *pMetaRsp);
if (ret < 0) {
metaError("vgId:%d, failed to update meta rsp:%s since %s", TD_VID(pMeta->pVnode), pReq->name, tstrerror(ret));
}
for (int32_t i = 0; i < pReq->colCmpr.nCols; i++) {
SColCmpr *p = &pReq->colCmpr.pColCmpr[i];
(*pMetaRsp)->pSchemaExt[i].colId = p->id;
(*pMetaRsp)->pSchemaExt[i].compress = p->alg;
}
}
// build response
code = metaBuildCreateNormalTableRsp(pMeta, &entry, ppRsp);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
}
pMeta->changed = true;
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
pReq->type);
return 0;
_err:
metaError("vgId:%d, failed to create table:%s type:%s since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->type == TSDB_CHILD_TABLE ? "child table" : "normal table", tstrerror(terrno));
return TSDB_CODE_FAILED;
#endif
return code;
code = metaHandleEntry2(pMeta, &entry);
if (TSDB_CODE_SUCCESS == code) {
metaInfo("vgId:%d, normal table:%s uid %" PRId64 " is created, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name,
pReq->uid, version);
} else {
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version);
}
TAOS_RETURN(code);
}
// Drop Normal Table
@ -333,11 +409,12 @@ _err:
int32_t metaCreateTable2(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **ppRsp) {
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CHILD_TABLE == pReq->type) {
code = metaCreateChildTable(pMeta, version, pReq, ppRsp);
// TODO
code = metaCreateTable(pMeta, version, pReq, ppRsp);
} else if (TSDB_NORMAL_TABLE == pReq->type) {
code = metaCreateNormalTable(pMeta, version, pReq, ppRsp);
} else {
code = TSDB_CODE_INVALID_MSG;
}
return code;
TAOS_RETURN(code);
}

View File

@ -776,7 +776,7 @@ _exit:
TAOS_RETURN(code);
}
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
int32_t code = 0;
(void)taosThreadMutexLock(&pTsdb->lruMutex);

View File

@ -1233,7 +1233,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
}
// do create table
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
if (metaCreateTable2(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
@ -1961,7 +1961,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
// create table
if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
if (metaCreateTable2(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
// create table success
if (newTbUids == NULL &&