more code

This commit is contained in:
Hongze Cheng 2024-12-14 18:06:50 +08:00
parent 161081a0a2
commit 389f545a53
6 changed files with 111 additions and 165 deletions

View File

@ -155,17 +155,13 @@ int metaCommit(SMeta* pMeta, TXN* txn);
int metaFinishCommit(SMeta* pMeta, TXN* txn);
int metaPrepareAsyncCommit(SMeta* pMeta);
int metaAbort(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaCreateSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int32_t metaAlterSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int32_t metaDropSuperTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaDropTable2(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int32_t metaTrimTables(SMeta* pMeta);
int32_t metaDropTables(SMeta* pMeta, SArray* tbUids);
int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);

View File

@ -1085,7 +1085,6 @@ static int32_t metaHandleSuperTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
if (TSDB_CODE_SUCCESS == code) {
pMeta->pVnode->config.vndStats.numOfSTables++;
pMeta->changed = true;
metaInfo("vgId:%d, %s success, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", TD_VID(pMeta->pVnode),
__func__, pEntry->version, pEntry->type, pEntry->uid, pEntry->name);
@ -1135,7 +1134,6 @@ static int32_t metaHandleNormalTableCreate(SMeta *pMeta, const SMetaEntry *pEntr
if (TSDB_CODE_SUCCESS == code) {
pMeta->pVnode->config.vndStats.numOfNTables++;
pMeta->pVnode->config.vndStats.numOfNTimeSeries += pEntry->ntbEntry.schemaRow.nCols - 1;
pMeta->changed = true;
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int32_t rc = tsdbCacheNewTable(pMeta->pVnode->pTsdb, pEntry->uid, -1, &pEntry->ntbEntry.schemaRow);
@ -1229,7 +1227,6 @@ static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
}
}
pMeta->changed = true;
} else {
metaErr(TD_VID(pMeta->pVnode), code);
}
@ -1309,7 +1306,6 @@ static int32_t metaHandleNormalTableDrop(SMeta *pMeta, const SMetaEntry *pEntry)
}
}
pMeta->changed = true;
metaFetchEntryFree(&pOldEntry);
return code;
}
@ -1424,7 +1420,6 @@ static int32_t metaHandleChildTableDrop(SMeta *pMeta, const SMetaEntry *pEntry,
*tbUid = uid;
}
#endif
pMeta->changed = true;
metaFetchEntryFree(&pChild);
metaFetchEntryFree(&pSuper);
return code;
@ -1781,7 +1776,6 @@ static int32_t metaHandleSuperTableDrop(SMeta *pMeta, const SMetaEntry *pEntry)
// do other stuff
metaUpdTimeSeriesNum(pMeta);
pMeta->changed = true;
// free resource and return
taosArrayDestroy(childList);
@ -1858,6 +1852,7 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
}
if (TSDB_CODE_SUCCESS == code) {
pMeta->changed = true;
metaDebug("vgId:%d, %s success, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__,
pEntry->version, pEntry->type, pEntry->uid, pEntry->type > 0 ? pEntry->name : "");
} else {

View File

@ -301,7 +301,7 @@ void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#endif
}
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
static int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry me = {0};
int kLen = 0;
int vLen = 0;
@ -361,7 +361,7 @@ _err:
return code;
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
static int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
@ -521,7 +521,7 @@ static int32_t metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) {
return 0;
}
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
static int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TBC *pUidIdxc = NULL;
@ -689,7 +689,8 @@ _exit:
tdbTbcClose(pUidIdxc);
return 0;
}
int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
static int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
STbDbKey tbDbKey = {0};
@ -874,7 +875,8 @@ _err:
return code;
}
int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
static int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
int32_t code = 0;
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
@ -1061,7 +1063,7 @@ _err:
return code;
}
int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
static int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0};
SMetaReader mr = {0};
int32_t ret;
@ -1230,7 +1232,7 @@ _err:
return TSDB_CODE_FAILED;
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
static int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
void *pData = NULL;
int nData = 0;
int rc = 0;
@ -1284,7 +1286,7 @@ _exit:
return rc;
}
int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
int32_t code = 0;
if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS;
@ -1676,7 +1678,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
return 0;
}
// opt ins_tables
int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SBtimeIdxKey btimeKey = {0};
if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0;
@ -1687,13 +1689,14 @@ int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbUpsert(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), NULL, 0, pMeta->txn);
}
int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SBtimeIdxKey btimeKey = {0};
if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0;
}
return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn);
}
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
SNcolIdxKey ncolKey = {0};
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
@ -2625,144 +2628,6 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
return 0;
}
static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry stbEntry = {0};
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
tb_uid_t uid, suid;
int64_t oversion;
const void *pData = NULL;
int nData = 0;
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
} else {
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
}
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
ret = -1;
goto _err;
}
suid = ((SUidIdxVal *)pVal)[0].suid;
STbDbKey tbDbKey = {0};
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
ret = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
if (ret < 0) {
goto _err;
}
tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry);
if (ret < 0) {
tDecoderClear(&dc);
goto _err;
}
// Get target schema info
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SSchema *pCol = NULL;
int32_t iCol = 0;
for (;;) {
pCol = NULL;
if (iCol >= pTagSchema->nCols) break;
pCol = &pTagSchema->pSchema[iCol];
if (strcmp(pCol->name, pAlterTbReq->tagName) == 0) break;
iCol++;
}
if (iCol == 0) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
if (pCol == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
/*
* iterator all pTdDbc by uid and version
*/
TBC *pCtbIdxc = NULL;
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL));
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _err;
}
for (;;) {
void *pKey, *pVal;
int nKey, nVal;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
if (rc < 0) break;
if (((SCtbIdxKey *)pKey)->suid != uid) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const void *pTagData = NULL;
int32_t nTagData = 0;
STagVal tagVal = {.cid = pCol->colId};
if (tTagGet((const STag *)pVal, &tagVal)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pCol->type)) {
nTagData = tDataTypes[pCol->type].bytes;
}
}
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
tdbFree(pKey);
tdbFree(pVal);
metaDestroyTagIdxKey(pTagIdxKey);
tdbTbcClose(pCtbIdxc);
goto _err;
}
ret = tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
if (ret < 0) {
metaError("meta/table: failed to upsert tag idx:%s uid:%" PRId64, stbEntry.name, stbEntry.uid);
}
metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
}
tdbTbcClose(pCtbIdxc);
return 0;
_err:
// tDecoderClear(&dc1);
// tDecoderClear(&dc2);
// if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
// tdbTbcClose(pTbDbc);
// tdbTbcClose(pUidIdxc);
return TSDB_CODE_FAILED;
}
typedef struct SMetaPair {
void *key;
int nkey;
@ -2887,7 +2752,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
_err:
return TSDB_CODE_FAILED;
}
int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
// impl later
SMetaEntry tbEntry = {0};
void *pVal = NULL;

View File

@ -1742,4 +1742,94 @@ int32_t metaDropIndexFromSuperTable(SMeta *pMeta, int64_t version, SDropIndexReq
metaFetchEntryFree(&pEntry);
TAOS_RETURN(code);
}
int32_t metaAlterSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pReq->name || strlen(pReq->name) == 0) {
metaError("vgId:%d, %s failed at %s:%d since invalid table name, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, version);
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
}
SMetaEntry *pEntry = NULL;
code = metaFetchEntryByName(pMeta, pReq->name, &pEntry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, pReq->name, version);
TAOS_RETURN(TSDB_CODE_TDB_STB_NOT_EXIST);
}
if (pEntry->type != TSDB_SUPER_TABLE) {
metaError("vgId:%d, %s failed at %s:%d since table %s type %d is invalid, version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, pReq->name, pEntry->type, version);
metaFetchEntryFree(&pEntry);
TAOS_RETURN(TSDB_CODE_VND_INVALID_TABLE_ACTION);
}
SMetaEntry entry = {
.version = version,
.type = TSDB_SUPER_TABLE,
.uid = pReq->suid,
.name = pReq->name,
.stbEntry.schemaRow = pReq->schemaRow,
.stbEntry.schemaTag = pReq->schemaTag,
.colCmpr = pReq->colCmpr,
};
TABLE_SET_COL_COMPRESSED(entry.flags);
// do handle the entry
code = metaHandleEntry2(pMeta, &entry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, tstrerror(code), pReq->suid, pReq->name, version);
metaFetchEntryFree(&pEntry);
TAOS_RETURN(code);
} else {
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name,
pReq->suid, version);
}
metaFetchEntryFree(&pEntry);
TAOS_RETURN(code);
}
int32_t metaDropMultipleTables(SMeta *pMeta, int64_t version, SArray *uidArray) {
int32_t code = 0;
if (taosArrayGetSize(uidArray) == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < taosArrayGetSize(uidArray); i++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(uidArray, i);
SMetaInfo info;
code = metaGetInfo(pMeta, uid, &info, NULL);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since table uid %" PRId64 " not found, code:%d", TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, uid, code);
return code;
}
SMetaEntry entry = {
.version = version,
.uid = uid,
};
if (info.suid == 0) {
entry.type = -TSDB_NORMAL_TABLE;
} else if (info.suid == uid) {
entry.type = -TSDB_SUPER_TABLE;
} else {
entry.type = -TSDB_CHILD_TABLE;
}
code = metaHandleEntry2(pMeta, &entry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, tstrerror(code), uid, version);
return code;
}
}
return code;
}

View File

@ -129,7 +129,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag;
TAOS_CHECK_EXIT(metaCreateSTable(SMA_META(pSma), ver, &pReq));
TAOS_CHECK_EXIT(metaCreateSuperTable(SMA_META(pSma), ver, &pReq));
} else {
TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
}
@ -204,7 +204,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue;
}
if( taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
if (taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
code = terrno;
continue;
}

View File

@ -1038,7 +1038,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
}
if (ttlReq.nUids > 0) {
int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids);
int32_t code = metaDropMultipleTables(pVnode->pMeta, ver, ttlReq.pTbUids);
if (code) return code;
code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
@ -1344,7 +1344,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq,
return code;
}
code = metaAlterSTable(pVnode->pMeta, ver, &req);
code = metaAlterSuperTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
tDecoderClear(&dc);