diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index db285dc124..0bfdf33019 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -107,10 +107,10 @@ struct SQueryNode { typedef SVCreateTbReq STbCfg; typedef SVCreateTSmaReq SSmaCfg; -SMTbCursor *metaOpenTbCursor(void *pVnode); -void metaCloseTbCursor(SMTbCursor *pTbCur); -int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); +SMTbCursor* metaOpenTbCursor(void* pVnode); +void metaCloseTbCursor(SMTbCursor* pTbCur); +int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType); +int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType); #endif @@ -146,6 +146,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); +int32_t metaTrimTables(SMeta* pMeta); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); @@ -154,8 +155,8 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaAlterCache(SMeta* pMeta, int32_t nPage); -int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); -int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid); +int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); +int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid); int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); @@ -175,7 +176,7 @@ void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); -void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); +void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -491,7 +492,6 @@ struct SCompactInfo { void initStorageAPI(SStorageAPI* pAPI); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4f9f0c688e..20cf423182 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -848,7 +848,52 @@ static void metaDropTables(SMeta *pMeta, SArray *tbUids) { metaULock(pMeta); } -int metaTrimTables(SMeta *pMeta, int64_t version) { +static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) { + int32_t code = 0; + // 1, tranverse table's + // 2, validate table name using vnodeValidateTableHash + // 3, push invalidated table's uid into uidList + + TBC *pCur; + code = tdbTbcOpen(pMeta->pTbDb, &pCur, NULL); + if (code < 0) { + return code; + } + + code = tdbTbcMoveToFirst(pCur); + if (code) { + tdbTbcClose(pCur); + return code; + } + + void *pData = NULL; + int nData = 0; + + while (1) { + int32_t ret = tdbTbcNext(pCur, NULL, NULL, &pData, &nData); + if (ret < 0) { + break; + } + + SMetaEntry me = {0}; + SDecoder dc = {0}; + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &me); + if (me.type == TSDB_CHILD_TABLE) { + int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name); + if (TSDB_CODE_VND_HASH_MISMATCH == ret) { + taosArrayPush(uidList, &me.uid); + } + } + tDecoderClear(&dc); + } + tdbFree(pData); + tdbTbcClose(pCur); + + return 0; +} + +int32_t metaTrimTables(SMeta *pMeta) { int32_t code = 0; SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); @@ -856,7 +901,7 @@ int metaTrimTables(SMeta *pMeta, int64_t version) { return TSDB_CODE_OUT_OF_MEMORY; } - // code = metaFilterTableByHash(pMeta, /*ttl, */ tbUids); + code = metaFilterTableByHash(pMeta, tbUids); if (code != 0) { goto end; } @@ -1027,7 +1072,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1); metaUidCacheClear(pMeta, e.ctbEntry.suid); - metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); + metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) @@ -1039,7 +1084,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { metaStatsCacheDrop(pMeta, uid); metaUidCacheClear(pMeta, uid); - metaTbGroupCacheClear(pMeta, uid); + metaTbGroupCacheClear(pMeta, uid); --pMeta->pVnode->config.vndStats.numOfSTables; } @@ -1460,7 +1505,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn); metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid); - metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); + metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaULock(pMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b950437f23..c3fb5e5ad4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -245,11 +245,11 @@ _exit: static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; - int32_t size; - int32_t ret; - uint8_t *pCont; - SEncoder *pCoder = &(SEncoder){0}; - SDeleteRes res = {0}; + int32_t size; + int32_t ret; + uint8_t *pCont; + SEncoder *pCoder = &(SEncoder){0}; + SDeleteRes res = {0}; SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; initStorageAPI(&handle.api); @@ -316,8 +316,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg return -1; } - vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), - ver); + vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applied + 1 == ver); @@ -1479,6 +1478,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) { pVnode->config.hashBegin, pVnode->config.hashEnd, ver); // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd] + code = metaTrimTables(pVnode->pMeta); return code; } @@ -1492,8 +1492,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pR code = vnodeConsolidateAlterHashRange(pVnode, ver); if (code < 0) { - vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), - ver); + vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver); goto _exit; } pVnode->config.hashChange = false;