meta/split: trim invalidated uids
This commit is contained in:
parent
99d6c5cfa0
commit
fb2f367aac
|
@ -107,10 +107,10 @@ struct SQueryNode {
|
|||
typedef SVCreateTbReq STbCfg;
|
||||
typedef SVCreateTSmaReq SSmaCfg;
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(void *pVnode);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
|
||||
SMTbCursor* metaOpenTbCursor(void* pVnode);
|
||||
void metaCloseTbCursor(SMTbCursor* pTbCur);
|
||||
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
|
||||
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -146,6 +146,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
|
|||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
|
||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
|
||||
int32_t metaTrimTables(SMeta* pMeta);
|
||||
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
|
||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||
|
@ -155,7 +156,7 @@ int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
|||
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||
|
||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
|
||||
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid);
|
||||
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
||||
|
||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||
|
@ -175,7 +176,7 @@ void* metaGetIdx(SMeta* pMeta);
|
|||
void* metaGetIvtIdx(SMeta* pMeta);
|
||||
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||
|
||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||
void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
|
||||
|
||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||
|
@ -491,7 +492,6 @@ struct SCompactInfo {
|
|||
|
||||
void initStorageAPI(SStorageAPI* pAPI);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -848,7 +848,52 @@ static void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
|||
metaULock(pMeta);
|
||||
}
|
||||
|
||||
int metaTrimTables(SMeta *pMeta, int64_t version) {
|
||||
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
|
||||
int32_t code = 0;
|
||||
// 1, tranverse table's
|
||||
// 2, validate table name using vnodeValidateTableHash
|
||||
// 3, push invalidated table's uid into uidList
|
||||
|
||||
TBC *pCur;
|
||||
code = tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
|
||||
if (code < 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToFirst(pCur);
|
||||
if (code) {
|
||||
tdbTbcClose(pCur);
|
||||
return code;
|
||||
}
|
||||
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
|
||||
while (1) {
|
||||
int32_t ret = tdbTbcNext(pCur, NULL, NULL, &pData, &nData);
|
||||
if (ret < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMetaEntry me = {0};
|
||||
SDecoder dc = {0};
|
||||
tDecoderInit(&dc, pData, nData);
|
||||
metaDecodeEntry(&dc, &me);
|
||||
if (me.type == TSDB_CHILD_TABLE) {
|
||||
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name);
|
||||
if (TSDB_CODE_VND_HASH_MISMATCH == ret) {
|
||||
taosArrayPush(uidList, &me.uid);
|
||||
}
|
||||
}
|
||||
tDecoderClear(&dc);
|
||||
}
|
||||
tdbFree(pData);
|
||||
tdbTbcClose(pCur);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t metaTrimTables(SMeta *pMeta) {
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||
|
@ -856,7 +901,7 @@ int metaTrimTables(SMeta *pMeta, int64_t version) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// code = metaFilterTableByHash(pMeta, /*ttl, */ tbUids);
|
||||
code = metaFilterTableByHash(pMeta, tbUids);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
|
|
@ -316,8 +316,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
return -1;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
ver);
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
|
||||
|
||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||
ASSERT(pVnode->state.applied + 1 == ver);
|
||||
|
@ -1479,6 +1478,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
|
|||
pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
|
||||
|
||||
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
||||
code = metaTrimTables(pVnode->pMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1492,8 +1492,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pR
|
|||
|
||||
code = vnodeConsolidateAlterHashRange(pVnode, ver);
|
||||
if (code < 0) {
|
||||
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(),
|
||||
ver);
|
||||
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(), ver);
|
||||
goto _exit;
|
||||
}
|
||||
pVnode->config.hashChange = false;
|
||||
|
|
Loading…
Reference in New Issue