Merge pull request #21386 from taosdata/fix/TD-23549

fix(meta): split meta
This commit is contained in:
wade zhang 2023-06-05 17:47:19 +08:00 committed by GitHub
commit 11321aa279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 28 deletions

View File

@ -107,10 +107,10 @@ struct SQueryNode {
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg; typedef SVCreateTSmaReq SSmaCfg;
SMTbCursor *metaOpenTbCursor(void *pVnode); SMTbCursor* metaOpenTbCursor(void* pVnode);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor* pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
#endif #endif
@ -146,6 +146,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
@ -155,7 +156,7 @@ int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage); int metaAlterCache(SMeta* pMeta, int32_t nPage);
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid); int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
@ -175,7 +176,7 @@ void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
@ -491,7 +492,6 @@ struct SCompactInfo {
void initStorageAPI(SStorageAPI* pAPI); void initStorageAPI(SStorageAPI* pAPI);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -838,22 +838,96 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
return 0; return 0;
} }
static void metaDropTables(SMeta *pMeta, SArray *tbUids) {
metaWLock(pMeta);
for (int i = 0; i < TARRAY_SIZE(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, uid, NULL);
metaDebug("batch drop table:%" PRId64, uid);
}
metaULock(pMeta);
}
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
int32_t code = 0;
// 1, tranverse table's
// 2, validate table name using vnodeValidateTableHash
// 3, push invalidated table's uid into uidList
TBC *pCur;
code = tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
if (code < 0) {
return code;
}
code = tdbTbcMoveToFirst(pCur);
if (code) {
tdbTbcClose(pCur);
return code;
}
void *pData = NULL, *pKey = NULL;
int nData = 0, nKey = 0;
while (1) {
int32_t ret = tdbTbcNext(pCur, &pKey, &nKey, &pData, &nData);
if (ret < 0) {
break;
}
SMetaEntry me = {0};
SDecoder dc = {0};
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &me);
if (me.type != TSDB_SUPER_TABLE) {
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name);
if (TSDB_CODE_VND_HASH_MISMATCH == ret) {
taosArrayPush(uidList, &me.uid);
}
}
tDecoderClear(&dc);
}
tdbFree(pData);
tdbFree(pKey);
tdbTbcClose(pCur);
return 0;
}
int32_t metaTrimTables(SMeta *pMeta) {
int32_t code = 0;
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = metaFilterTableByHash(pMeta, tbUids);
if (code != 0) {
goto end;
}
if (TARRAY_SIZE(tbUids) == 0) {
goto end;
}
metaDropTables(pMeta, tbUids);
end:
taosArrayDestroy(tbUids);
return code;
}
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
int ret = metaTtlSmaller(pMeta, ttl, tbUids); int ret = metaTtlSmaller(pMeta, ttl, tbUids);
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
if (taosArrayGetSize(tbUids) == 0) { if (TARRAY_SIZE(tbUids) == 0) {
return 0; return 0;
} }
metaWLock(pMeta); metaDropTables(pMeta, tbUids);
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, *uid, NULL);
metaDebug("ttl drop table:%" PRId64, *uid);
}
metaULock(pMeta);
return 0; return 0;
} }

View File

@ -316,8 +316,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
return -1; return -1;
} }
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
ver);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applied + 1 == ver); ASSERT(pVnode->state.applied + 1 == ver);
@ -1479,6 +1478,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
pVnode->config.hashBegin, pVnode->config.hashEnd, ver); pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd] // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
code = metaTrimTables(pVnode->pMeta);
return code; return code;
} }
@ -1492,8 +1492,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pR
code = vnodeConsolidateAlterHashRange(pVnode, ver); code = vnodeConsolidateAlterHashRange(pVnode, ver);
if (code < 0) { if (code < 0) {
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
ver);
goto _exit; goto _exit;
} }
pVnode->config.hashChange = false; pVnode->config.hashChange = false;