Merge pull request #29041 from taosdata/enh/TD-29367-3.0

Enh/TD-29367-3.0
This commit is contained in:
Shengliang Guan 2024-12-16 21:33:20 +08:00 committed by GitHub
commit 9becb9ca5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 4122 additions and 313 deletions

View File

@ -62,7 +62,7 @@ typedef struct SMetaEntry {
struct { struct {
int64_t btime; int64_t btime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen; // not include '\0'
char* comment; char* comment;
tb_uid_t suid; tb_uid_t suid;
uint8_t* pTags; uint8_t* pTags;

View File

@ -32,6 +32,8 @@ set(
"src/meta/metaSnapshot.c" "src/meta/metaSnapshot.c"
"src/meta/metaCache.c" "src/meta/metaCache.c"
"src/meta/metaTtl.c" "src/meta/metaTtl.c"
"src/meta/metaEntry2.c"
"src/meta/metaTable2.c"
# sma # sma
"src/sma/smaEnv.c" "src/sma/smaEnv.c"

View File

@ -60,7 +60,7 @@ int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); } static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
// metaTable ================== // metaTable ==================
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME); int32_t metaHandleEntry2(SMeta* pMeta, const SMetaEntry* pEntry);
// metaCache ================== // metaCache ==================
int32_t metaCacheOpen(SMeta* pMeta); int32_t metaCacheOpen(SMeta* pMeta);

View File

@ -155,13 +155,13 @@ int metaCommit(SMeta* pMeta, TXN* txn);
int metaFinishCommit(SMeta* pMeta, TXN* txn); int metaFinishCommit(SMeta* pMeta, TXN* txn);
int metaPrepareAsyncCommit(SMeta* pMeta); int metaPrepareAsyncCommit(SMeta* pMeta);
int metaAbort(SMeta* pMeta); int metaAbort(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int32_t metaAlterSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int32_t metaDropSuperTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int32_t metaDropTable2(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int32_t metaDropTables(SMeta* pMeta, SArray* tbUids); int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
@ -176,8 +176,8 @@ int metaAlterCache(SMeta* pMeta, int32_t nPage);
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid); int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int32_t metaAddIndexToSuperTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int32_t metaDropIndexFromSuperTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type); int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
void metaUpdTimeSeriesNum(SMeta* pMeta); void metaUpdTimeSeriesNum(SMeta* pMeta);
@ -217,7 +217,7 @@ int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbPrepareCommit(STsdb* pTsdb); // int32_t tsdbPrepareCommit(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, const SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);

View File

@ -65,12 +65,32 @@ static FORCE_INLINE int32_t metatInitDefaultSColCmprWrapper(SDecoder *pDecoder,
return 0; return 0;
} }
static int32_t metaCloneColCmpr(const SColCmprWrapper *pSrc, SColCmprWrapper *pDst) {
if (pSrc->nCols > 0) {
pDst->nCols = pSrc->nCols;
pDst->version = pSrc->version;
pDst->pColCmpr = (SColCmpr *)taosMemoryCalloc(pSrc->nCols, sizeof(SColCmpr));
if (NULL == pDst->pColCmpr) {
return terrno;
}
memcpy(pDst->pColCmpr, pSrc->pColCmpr, pSrc->nCols * sizeof(SColCmpr));
}
return 0;
}
static void metaCloneColCmprFree(SColCmprWrapper *pCmpr) {
if (pCmpr) {
taosMemoryFreeClear(pCmpr->pColCmpr);
}
}
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
TAOS_CHECK_RETURN(tStartEncode(pCoder)); TAOS_CHECK_RETURN(tStartEncode(pCoder));
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->version)); TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->version));
TAOS_CHECK_RETURN(tEncodeI8(pCoder, pME->type)); TAOS_CHECK_RETURN(tEncodeI8(pCoder, pME->type));
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->uid)); TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->uid));
if (pME->type > 0) {
if (pME->name == NULL) { if (pME->name == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -109,6 +129,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
TAOS_CHECK_RETURN(meteEncodeColCmprEntry(pCoder, pME)); TAOS_CHECK_RETURN(meteEncodeColCmprEntry(pCoder, pME));
}
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
@ -119,6 +140,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version)); TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version));
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type)); TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type));
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid)); TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid));
if (pME->type > 0) {
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name)); TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name));
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
@ -180,7 +203,164 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
} }
TABLE_SET_COL_COMPRESSED(pME->flags); TABLE_SET_COL_COMPRESSED(pME->flags);
} }
}
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) {
if (pSrc == NULL || pDst == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pDst->nCols = pSrc->nCols;
pDst->version = pSrc->version;
pDst->pSchema = (SSchema *)taosMemoryMalloc(pSrc->nCols * sizeof(SSchema));
if (pDst->pSchema == NULL) {
return terrno;
}
memcpy(pDst->pSchema, pSrc->pSchema, pSrc->nCols * sizeof(SSchema));
return TSDB_CODE_SUCCESS;
}
static void metaCloneSchemaFree(SSchemaWrapper *pSchema) {
if (pSchema) {
taosMemoryFreeClear(pSchema->pSchema);
}
}
void metaCloneEntryFree(SMetaEntry **ppEntry) {
if (ppEntry == NULL || *ppEntry == NULL) {
return;
}
taosMemoryFreeClear((*ppEntry)->name);
if ((*ppEntry)->type < 0) {
taosMemoryFreeClear(*ppEntry);
return;
}
if (TSDB_SUPER_TABLE == (*ppEntry)->type) {
metaCloneSchemaFree(&(*ppEntry)->stbEntry.schemaRow);
metaCloneSchemaFree(&(*ppEntry)->stbEntry.schemaTag);
} else if (TSDB_CHILD_TABLE == (*ppEntry)->type) {
taosMemoryFreeClear((*ppEntry)->ctbEntry.comment);
taosMemoryFreeClear((*ppEntry)->ctbEntry.pTags);
} else if (TSDB_NORMAL_TABLE == (*ppEntry)->type) {
metaCloneSchemaFree(&(*ppEntry)->ntbEntry.schemaRow);
taosMemoryFreeClear((*ppEntry)->ntbEntry.comment);
} else {
return;
}
metaCloneColCmprFree(&(*ppEntry)->colCmpr);
taosMemoryFreeClear(*ppEntry);
return;
}
int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pEntry || NULL == ppEntry) {
return TSDB_CODE_INVALID_PARA;
}
*ppEntry = (SMetaEntry *)taosMemoryCalloc(1, sizeof(SMetaEntry));
if (NULL == *ppEntry) {
return terrno;
}
(*ppEntry)->version = pEntry->version;
(*ppEntry)->type = pEntry->type;
(*ppEntry)->uid = pEntry->uid;
if (pEntry->type < 0) {
return TSDB_CODE_SUCCESS;
}
if (pEntry->name) {
(*ppEntry)->name = tstrdup(pEntry->name);
if (NULL == (*ppEntry)->name) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
}
if (pEntry->type == TSDB_SUPER_TABLE) {
(*ppEntry)->flags = pEntry->flags;
code = metaCloneSchema(&pEntry->stbEntry.schemaRow, &(*ppEntry)->stbEntry.schemaRow);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
code = metaCloneSchema(&pEntry->stbEntry.schemaTag, &(*ppEntry)->stbEntry.schemaTag);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
} else if (pEntry->type == TSDB_CHILD_TABLE) {
(*ppEntry)->ctbEntry.btime = pEntry->ctbEntry.btime;
(*ppEntry)->ctbEntry.ttlDays = pEntry->ctbEntry.ttlDays;
(*ppEntry)->ctbEntry.suid = pEntry->ctbEntry.suid;
// comment
(*ppEntry)->ctbEntry.commentLen = pEntry->ctbEntry.commentLen;
if (pEntry->ctbEntry.commentLen > 0) {
(*ppEntry)->ctbEntry.comment = taosMemoryMalloc(pEntry->ctbEntry.commentLen + 1);
if (NULL == (*ppEntry)->ctbEntry.comment) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ctbEntry.comment, pEntry->ctbEntry.comment, pEntry->ctbEntry.commentLen + 1);
}
// tags
STag *pTags = (STag *)pEntry->ctbEntry.pTags;
(*ppEntry)->ctbEntry.pTags = taosMemoryCalloc(1, pTags->len);
if (NULL == (*ppEntry)->ctbEntry.pTags) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ctbEntry.pTags, pEntry->ctbEntry.pTags, pTags->len);
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
(*ppEntry)->ntbEntry.btime = pEntry->ntbEntry.btime;
(*ppEntry)->ntbEntry.ttlDays = pEntry->ntbEntry.ttlDays;
(*ppEntry)->ntbEntry.ncid = pEntry->ntbEntry.ncid;
// schema
code = metaCloneSchema(&pEntry->ntbEntry.schemaRow, &(*ppEntry)->ntbEntry.schemaRow);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
// comment
(*ppEntry)->ntbEntry.commentLen = pEntry->ntbEntry.commentLen;
if (pEntry->ntbEntry.commentLen > 0) {
(*ppEntry)->ntbEntry.comment = taosMemoryMalloc(pEntry->ntbEntry.commentLen + 1);
if (NULL == (*ppEntry)->ntbEntry.comment) {
code = terrno;
metaCloneEntryFree(ppEntry);
return code;
}
memcpy((*ppEntry)->ntbEntry.comment, pEntry->ntbEntry.comment, pEntry->ntbEntry.commentLen + 1);
}
} else {
return TSDB_CODE_INVALID_PARA;
}
code = metaCloneColCmpr(&pEntry->colCmpr, &(*ppEntry)->colCmpr);
if (code) {
metaCloneEntryFree(ppEntry);
return code;
}
return code;
}

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,7 @@
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
@ -330,7 +330,7 @@ static int32_t metaGenerateNewMeta(SMeta **ppMeta) {
tdbTbGet(pMeta->pUidIdx, &me.ctbEntry.suid, sizeof(me.ctbEntry.suid), NULL, NULL) != 0) { tdbTbGet(pMeta->pUidIdx, &me.ctbEntry.suid, sizeof(me.ctbEntry.suid), NULL, NULL) != 0) {
metaError("vgId:%d failed to get super table uid:%" PRId64 " for child table uid:%" PRId64, metaError("vgId:%d failed to get super table uid:%" PRId64 " for child table uid:%" PRId64,
TD_VID(pVnode), me.ctbEntry.suid, uid); TD_VID(pVnode), me.ctbEntry.suid, uid);
} else if (metaHandleEntry(pNewMeta, &me) != 0) { } else if (metaHandleEntry2(pNewMeta, &me) != 0) {
metaError("vgId:%d failed to handle entry, uid:%" PRId64, TD_VID(pVnode), uid); metaError("vgId:%d failed to handle entry, uid:%" PRId64, TD_VID(pVnode), uid);
} }
} }
@ -598,7 +598,7 @@ static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0; return 0;
} }
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1; STagIdxKey *pTagIdxKey1 = (STagIdxKey *)pKey1;
STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2; STagIdxKey *pTagIdxKey2 = (STagIdxKey *)pKey2;
tb_uid_t uid1 = 0, uid2 = 0; tb_uid_t uid1 = 0, uid2 = 0;

View File

@ -72,7 +72,6 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t nKey = 0; int32_t nKey = 0;
int32_t nData = 0; int32_t nData = 0;
STbDbKey key; STbDbKey key;
SMetaInfo info;
*ppData = NULL; *ppData = NULL;
for (;;) { for (;;) {
@ -85,8 +84,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
goto _exit; goto _exit;
} }
if (key.version < pReader->sver // if (key.version < pReader->sver) {
|| metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
if (tdbTbcMoveToNext(pReader->pTbc) != 0) { if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode)); metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
} }
@ -199,7 +197,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
code = metaDecodeEntry(pDecoder, &metaEntry); code = metaDecodeEntry(pDecoder, &metaEntry);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = metaHandleEntry(pMeta, &metaEntry); code = metaHandleEntry2(pMeta, &metaEntry);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:

View File

@ -17,8 +17,18 @@
extern SDmNotifyHandle dmNotifyHdl; extern SDmNotifyHandle dmNotifyHdl;
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
int32_t metaUpdateTableTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
int32_t metaUpdateTableMultiTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
int32_t metaUpdateTableOptions2(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
int32_t metaUpdateTableColCompress2(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
int32_t metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
int32_t metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -29,30 +39,30 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry); static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl); static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl);
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey); void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
// opt ins_tables query // opt ins_tables query
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) { int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
int32_t nCols = pWp->nCols; int32_t nCols = pWp->nCols;
int32_t ver = pWp->version; int32_t ver = pWp->version;
if (add) { if (add) {
SColCmpr *p = taosMemoryCalloc(1, sizeof(SColCmpr) * (nCols + 1)); SColCmpr *p = taosMemoryRealloc(pWp->pColCmpr, sizeof(SColCmpr) * (nCols + 1));
if (p == NULL) { if (p == NULL) {
return terrno; return terrno;
} }
pWp->pColCmpr = p;
memcpy(p, pWp->pColCmpr, sizeof(SColCmpr) * nCols);
SColCmpr *pCol = p + nCols; SColCmpr *pCol = p + nCols;
pCol->id = pSchema->colId; pCol->id = pSchema->colId;
pCol->alg = compress; pCol->alg = compress;
pWp->nCols = nCols + 1; pWp->nCols = nCols + 1;
pWp->version = ver; pWp->version = ver;
pWp->pColCmpr = p;
} else { } else {
for (int32_t i = 0; i < nCols; i++) { for (int32_t i = 0; i < nCols; i++) {
SColCmpr *pOCmpr = &pWp->pColCmpr[i]; SColCmpr *pOCmpr = &pWp->pColCmpr[i];
@ -87,7 +97,7 @@ static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
} }
} }
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) { int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema)); pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
if (NULL == pMetaRsp->pSchemas) { if (NULL == pMetaRsp->pSchemas) {
return terrno; return terrno;
@ -110,7 +120,7 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema
return 0; return 0;
} }
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) { int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
int32_t code = 0; int32_t code = 0;
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
@ -279,7 +289,7 @@ _exception:
return code; return code;
} }
static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) { void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#if defined(TD_ENTERPRISE) #if defined(TD_ENTERPRISE)
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0); int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries; int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
@ -293,7 +303,7 @@ static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#endif #endif
} }
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { static int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry me = {0}; SMetaEntry me = {0};
int kLen = 0; int kLen = 0;
int vLen = 0; int vLen = 0;
@ -353,7 +363,7 @@ _err:
return code; return code;
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) { static int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
void *pKey = NULL; void *pKey = NULL;
int nKey = 0; int nKey = 0;
void *pData = NULL; void *pData = NULL;
@ -437,6 +447,12 @@ _drop_super_table:
tstrerror(terrno)); tstrerror(terrno));
} }
ret = metaCacheDrop(pMeta, pReq->suid);
if (ret < 0) {
metaError("vgId:%d, failed to drop stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid,
tstrerror(terrno));
}
ret = tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn); ret = tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to drop stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid, metaError("vgId:%d, failed to drop stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid,
@ -507,7 +523,7 @@ static int32_t metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) {
return 0; return 0;
} }
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { static int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0}; SMetaEntry nStbEntry = {0};
TBC *pUidIdxc = NULL; TBC *pUidIdxc = NULL;
@ -675,7 +691,8 @@ _exit:
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
return 0; return 0;
} }
int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
static int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0}; SMetaEntry nStbEntry = {0};
STbDbKey tbDbKey = {0}; STbDbKey tbDbKey = {0};
@ -860,7 +877,8 @@ _err:
return code; return code;
} }
int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
static int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) {
int32_t code = 0; int32_t code = 0;
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0}; SMetaEntry nStbEntry = {0};
@ -1047,7 +1065,7 @@ _err:
return code; return code;
} }
int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) { static int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0}; SMetaEntry me = {0};
SMetaReader mr = {0}; SMetaReader mr = {0};
int32_t ret; int32_t ret;
@ -1216,7 +1234,7 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) { static int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
void *pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int rc = 0; int rc = 0;
@ -1270,7 +1288,7 @@ _exit:
return rc; return rc;
} }
int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) { static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
int32_t code = 0; int32_t code = 0;
if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS; if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS;
@ -1662,7 +1680,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
return 0; return 0;
} }
// opt ins_tables // opt ins_tables
int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SBtimeIdxKey btimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
@ -1673,13 +1691,14 @@ int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbUpsert(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), NULL, 0, pMeta->txn); return tdbTbUpsert(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), NULL, 0, pMeta->txn);
} }
int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SBtimeIdxKey btimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
} }
return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn); return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn);
} }
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
SNcolIdxKey ncolKey = {0}; SNcolIdxKey ncolKey = {0};
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) { if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
@ -2611,144 +2630,6 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
return 0; return 0;
} }
static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry stbEntry = {0};
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
tb_uid_t uid, suid;
int64_t oversion;
const void *pData = NULL;
int nData = 0;
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
} else {
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
}
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
ret = -1;
goto _err;
}
suid = ((SUidIdxVal *)pVal)[0].suid;
STbDbKey tbDbKey = {0};
tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
ret = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
if (ret < 0) {
goto _err;
}
tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry);
if (ret < 0) {
tDecoderClear(&dc);
goto _err;
}
// Get target schema info
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
SSchema *pCol = NULL;
int32_t iCol = 0;
for (;;) {
pCol = NULL;
if (iCol >= pTagSchema->nCols) break;
pCol = &pTagSchema->pSchema[iCol];
if (strcmp(pCol->name, pAlterTbReq->tagName) == 0) break;
iCol++;
}
if (iCol == 0) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
if (pCol == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
/*
* iterator all pTdDbc by uid and version
*/
TBC *pCtbIdxc = NULL;
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL));
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
goto _err;
}
for (;;) {
void *pKey, *pVal;
int nKey, nVal;
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal);
if (rc < 0) break;
if (((SCtbIdxKey *)pKey)->suid != uid) {
tdbFree(pKey);
tdbFree(pVal);
continue;
}
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const void *pTagData = NULL;
int32_t nTagData = 0;
STagVal tagVal = {.cid = pCol->colId};
if (tTagGet((const STag *)pVal, &tagVal)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pCol->type].bytes;
}
} else {
if (!IS_VAR_DATA_TYPE(pCol->type)) {
nTagData = tDataTypes[pCol->type].bytes;
}
}
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
tdbFree(pKey);
tdbFree(pVal);
metaDestroyTagIdxKey(pTagIdxKey);
tdbTbcClose(pCtbIdxc);
goto _err;
}
ret = tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
if (ret < 0) {
metaError("meta/table: failed to upsert tag idx:%s uid:%" PRId64, stbEntry.name, stbEntry.uid);
}
metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
}
tdbTbcClose(pCtbIdxc);
return 0;
_err:
// tDecoderClear(&dc1);
// tDecoderClear(&dc2);
// if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
// tdbTbcClose(pTbDbc);
// tdbTbcClose(pUidIdxc);
return TSDB_CODE_FAILED;
}
typedef struct SMetaPair { typedef struct SMetaPair {
void *key; void *key;
int nkey; int nkey;
@ -2873,7 +2754,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
_err: _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { static int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
// impl later // impl later
SMetaEntry tbEntry = {0}; SMetaEntry tbEntry = {0};
void *pVal = NULL; void *pVal = NULL;
@ -2976,23 +2857,21 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
switch (pReq->action) { switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
return metaAddTableColumn(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
return metaDropTableColumn(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
return metaAlterTableColumnBytes(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
return metaAlterTableColumn(pMeta, version, pReq, pMetaRsp); return metaAlterTableColumnName(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
return metaUpdateTableTagVal(pMeta, version, pReq); return metaUpdateTableTagValue(pMeta, version, pReq);
case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL:
return metaUpdateTableMultiTagVal(pMeta, version, pReq); return metaUpdateTableMultiTagValue(pMeta, version, pReq);
return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS: case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
return metaUpdateTableOptions(pMeta, version, pReq); return metaUpdateTableOptions2(pMeta, version, pReq);
case TSDB_ALTER_TABLE_ADD_TAG_INDEX:
return metaAddTagIndex(pMeta, version, pReq);
case TSDB_ALTER_TABLE_DROP_TAG_INDEX:
return metaDropTagIndex(pMeta, version, pReq);
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
return metaUpdateTableColCompress(pMeta, version, pReq); return metaUpdateTableColCompress2(pMeta, version, pReq);
default: default:
return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
break; break;
@ -3155,7 +3034,7 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
return 0; return 0;
} }
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) { void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
if (pTagIdxKey) taosMemoryFree(pTagIdxKey); if (pTagIdxKey) taosMemoryFree(pTagIdxKey);
} }

File diff suppressed because it is too large Load Diff

View File

@ -129,7 +129,7 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg
pReq.schemaRow = pCfg->schemaRow; pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag; pReq.schemaTag = pCfg->schemaTag;
TAOS_CHECK_EXIT(metaCreateSTable(SMA_META(pSma), ver, &pReq)); TAOS_CHECK_EXIT(metaCreateSuperTable(SMA_META(pSma), ver, &pReq));
} else { } else {
TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT); TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
} }
@ -204,7 +204,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue; continue;
} }
if( taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) { if (taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
code = terrno; code = terrno;
continue; continue;
} }

View File

@ -1050,7 +1050,7 @@ _exit:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) { int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, const SSchemaWrapper *pSchemaRow) {
int32_t code = 0; int32_t code = 0;
(void)taosThreadMutexLock(&pTsdb->lruMutex); (void)taosThreadMutexLock(&pTsdb->lruMutex);

View File

@ -491,7 +491,7 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return code; return code;
} }
int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) { int32_t vnodePreProcessDropTbMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t size = 0; int32_t size = 0;
@ -514,14 +514,14 @@ int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) {
} }
for (int32_t i = 0; i < receivedBatchReqs.nReqs; ++i) { for (int32_t i = 0; i < receivedBatchReqs.nReqs; ++i) {
SVDropTbReq* pReq = receivedBatchReqs.pReqs + i; SVDropTbReq *pReq = receivedBatchReqs.pReqs + i;
tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, pReq->name); tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, pReq->name);
if (uid == 0) { if (uid == 0) {
vWarn("vgId:%d, preprocess drop ctb: %s not found", TD_VID(pVnode), pReq->name); vWarn("vgId:%d, preprocess drop ctb: %s not found", TD_VID(pVnode), pReq->name);
continue; continue;
} }
pReq->uid = uid; pReq->uid = uid;
vDebug("vgId:%d %s for: %s, uid: %"PRId64, TD_VID(pVnode), __func__, pReq->name, pReq->uid); vDebug("vgId:%d %s for: %s, uid: %" PRId64, TD_VID(pVnode), __func__, pReq->name, pReq->uid);
if (taosArrayPush(sentBatchReqs.pArray, pReq) == NULL) { if (taosArrayPush(sentBatchReqs.pArray, pReq) == NULL) {
code = terrno; code = terrno;
goto _exit; goto _exit;
@ -1038,7 +1038,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
if (ttlReq.nUids > 0) { if (ttlReq.nUids > 0) {
int32_t code = metaDropTables(pVnode->pMeta, ttlReq.pTbUids); int32_t code = metaDropMultipleTables(pVnode->pMeta, ver, ttlReq.pTbUids);
if (code) return code; if (code) return code;
code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false); code = tqUpdateTbUidList(pVnode->pTq, ttlReq.pTbUids, false);
@ -1150,7 +1150,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto _err; goto _err;
} }
code = metaCreateSTable(pVnode->pMeta, ver, &req); code = metaCreateSuperTable(pVnode->pMeta, ver, &req);
if (code) { if (code) {
pRsp->code = code; pRsp->code = code;
goto _err; goto _err;
@ -1238,7 +1238,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} }
// do create table // do create table
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) { if (metaCreateTable2(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
} else { } else {
@ -1344,7 +1344,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq,
return code; return code;
} }
code = metaAlterSTable(pVnode->pMeta, ver, &req); code = metaAlterSuperTable(pVnode->pMeta, ver, &req);
if (code) { if (code) {
pRsp->code = code; pRsp->code = code;
tDecoderClear(&dc); tDecoderClear(&dc);
@ -1376,7 +1376,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, i
// process request // process request
tbUidList = taosArrayInit(8, sizeof(int64_t)); tbUidList = taosArrayInit(8, sizeof(int64_t));
if (tbUidList == NULL) goto _exit; if (tbUidList == NULL) goto _exit;
if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) { if (metaDropSuperTable(pVnode->pMeta, ver, &req) < 0) {
rcode = terrno; rcode = terrno;
goto _exit; goto _exit;
} }
@ -1490,7 +1490,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
tb_uid_t tbUid = 0; tb_uid_t tbUid = 0;
/* code */ /* code */
ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid); ret = metaDropTable2(pVnode->pMeta, ver, pDropTbReq);
if (ret < 0) { if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS; dropTbRsp.code = TSDB_CODE_SUCCESS;
@ -1966,7 +1966,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
// create table // create table
if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) { if (metaCreateTable2(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
// create table success // create table success
if (newTbUids == NULL && if (newTbUids == NULL &&
@ -2433,7 +2433,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe
return terrno = TSDB_CODE_INVALID_MSG; return terrno = TSDB_CODE_INVALID_MSG;
} }
code = metaAddIndexToSTable(pVnode->pMeta, ver, &req); code = metaAddIndexToSuperTable(pVnode->pMeta, ver, &req);
if (code) { if (code) {
pRsp->code = code; pRsp->code = code;
goto _err; goto _err;
@ -2458,7 +2458,7 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
return code; return code;
} }
code = metaDropIndexFromSTable(pVnode->pMeta, ver, &req); code = metaDropIndexFromSuperTable(pVnode->pMeta, ver, &req);
if (code) { if (code) {
pRsp->code = code; pRsp->code = code;
return code; return code;
@ -2584,4 +2584,3 @@ _OVER:
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
#endif #endif

View File

@ -88,6 +88,11 @@ void tdbTxnCloseImpl(TXN *pTxn);
// other // other
void tdbFree(void *); void tdbFree(void *);
#define tdbFreeClear(p) \
do { \
tdbFree(p); \
p = NULL; \
} while (0)
typedef struct hashset_st *hashset_t; typedef struct hashset_st *hashset_t;