diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index bf3b897c6f..a3d3ceab24 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -79,16 +79,18 @@ typedef struct { TXN* pTxn; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); -int ttlMgrClose(STtlManger* pTtlMgr); -int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); +void ttlMgrClose(STtlManger* pTtlMgr); +int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); -int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); -int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); +bool ttlMgrNeedUpgrade(TDB* pEnv); +int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta); int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx); int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); + +int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a9541d8c47..cbf0933358 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore; #define META_BEGIN_HEAP_NIL 2 int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); +int metaUpgrade(SVnode* pVnode, SMeta** ppMeta); int metaClose(SMeta** pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); TXN* metaGetTxn(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 1fa5b9c1e9..d262567953 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) { return -1; } - if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) { - return -1; - } - tdbCommit(pMeta->pEnv, pMeta->txn); return 0; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index fb17aff318..511cc8d6ec 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } +static void metaCleanup(SMeta **ppMeta); + int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { SMeta *pMeta = NULL; int ret; @@ -180,51 +182,43 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { return 0; _err: - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - taosMemoryFree(pMeta); + metaCleanup(&pMeta); return -1; } -int metaClose(SMeta **ppMeta) { +int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) { + int code = TSDB_CODE_SUCCESS; SMeta *pMeta = *ppMeta; - if (pMeta) { - if (pMeta->pEnv) metaAbort(pMeta); - if (pMeta->pCache) metaCacheClose(pMeta); - if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); - if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); - if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); - if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); - if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); - if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); - if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); - if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); - if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); - if (pMeta->pEnv) tdbClose(pMeta->pEnv); - metaDestroyLock(pMeta); - taosMemoryFreeClear(*ppMeta); + if (ttlMgrNeedUpgrade(pMeta->pEnv)) { + code = metaBegin(pMeta, META_BEGIN_HEAP_OS); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + + code = metaCommit(pMeta, pMeta->txn); + if (code < 0) { + metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } } + return TSDB_CODE_SUCCESS; + +_err: + metaCleanup(ppMeta); + return code; +} + +int metaClose(SMeta **ppMeta) { + metaCleanup(ppMeta); return 0; } @@ -270,6 +264,32 @@ int32_t metaULock(SMeta *pMeta) { return ret; } +static void metaCleanup(SMeta **ppMeta) { + SMeta *pMeta = *ppMeta; + if (pMeta) { + if (pMeta->pEnv) metaAbort(pMeta); + if (pMeta->pCache) metaCacheClose(pMeta); + if (pMeta->pIdx) metaCloseIdx(pMeta); + if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); + if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); + if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx); + if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); + if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr); + if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); + if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); + if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); + if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx); + if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); + if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); + if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb); + if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb); + if (pMeta->pEnv) tdbClose(pMeta->pEnv); + metaDestroyLock(pMeta); + + taosMemoryFreeClear(*ppMeta); + } +} + static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1; STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index af4827a9c7..c6cb826149 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -21,6 +21,10 @@ typedef struct { SMeta *pMeta; } SConvertData; +static void ttlMgrCleanup(STtlManger *pTtlMgr); + +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta); + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); @@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { - int ret; + int ret = TSDB_CODE_SUCCESS; + int64_t startNs = taosGetTimestampNs(); *ppTtlMgr = NULL; STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); - if (pTtlMgr == NULL) { - return -1; - } - - if (tdbTbExist(ttlTbname, pEnv)) { - ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback); - if (ret < 0) { - metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); - return ret; - } - } + if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY; ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); - tdbOsFree(pTtlMgr); return ret; } @@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { taosThreadRwlockInit(&pTtlMgr->lock, NULL); + ret = ttlMgrFillCache(pTtlMgr); + if (ret < 0) { + metaError("failed to fill hash since %s", tstrerror(terrno)); + ttlMgrCleanup(pTtlMgr); + return ret; + } + + int64_t endNs = taosGetTimestampNs(); + metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + endNs - startNs); + *ppTtlMgr = pTtlMgr; - return 0; + return TSDB_CODE_SUCCESS; } -int ttlMgrClose(STtlManger *pTtlMgr) { - taosHashCleanup(pTtlMgr->pTtlCache); - taosHashCleanup(pTtlMgr->pDirtyUids); - tdbTbClose(pTtlMgr->pTtlIdx); - taosThreadRwlockDestroy(&pTtlMgr->lock); - tdbOsFree(pTtlMgr); - return 0; +void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); } + +bool ttlMgrNeedUpgrade(TDB *pEnv) { + bool needUpgrade = tdbTbExist(ttlTbname, pEnv); + if (needUpgrade) { + metaInfo("find ttl idx in old version , will convert"); + } + return needUpgrade; } -int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { - metaInfo("ttl mgr start open"); - int ret; +int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { + SMeta *meta = (SMeta *)pMeta; + int ret = TSDB_CODE_SUCCESS; + + if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS; + + metaInfo("ttl mgr start upgrade"); int64_t startNs = taosGetTimestampNs(); - SMeta *meta = (SMeta *)pMeta; + ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0); + if (ret < 0) { + metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); + goto _out; + } - if (pTtlMgr->pOldTtlIdx) { - ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); - if (ret < 0) { - metaError("failed to convert ttl index since %s", tstrerror(terrno)); - goto _out; - } + ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); + if (ret < 0) { + metaError("failed to convert ttl index since %s", tstrerror(terrno)); + goto _out; + } - ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); - if (ret < 0) { - metaError("failed to drop old ttl index since %s", tstrerror(terrno)); - goto _out; - } - - tdbTbClose(pTtlMgr->pOldTtlIdx); - pTtlMgr->pOldTtlIdx = NULL; + ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); + if (ret < 0) { + metaError("failed to drop old ttl index since %s", tstrerror(terrno)); + goto _out; } ret = ttlMgrFillCache(pTtlMgr); @@ -111,13 +120,23 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { } int64_t endNs = taosGetTimestampNs(); - - metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), + metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: + tdbTbClose(pTtlMgr->pOldTtlIdx); + pTtlMgr->pOldTtlIdx = NULL; + return ret; } +static void ttlMgrCleanup(STtlManger *pTtlMgr) { + taosHashCleanup(pTtlMgr->pTtlCache); + taosHashCleanup(pTtlMgr->pDirtyUids); + tdbTbClose(pTtlMgr->pTtlIdx); + taosThreadRwlockDestroy(&pTtlMgr->lock); + tdbOsFree(pTtlMgr); +} + static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { if (ttlDays <= 0) return; @@ -205,7 +224,7 @@ _out: return ret; } -int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { +static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { SMeta *meta = pMeta; metaInfo("ttlMgr convert ttl start."); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 583df15533..c794c7ebd6 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p } SSyncCfg *pCfg = &info.config.syncCfg; - + pCfg->replicaNum = 0; pCfg->totalReplicaNum = 0; memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); @@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); info.config.syncCfg = *pCfg; @@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { goto _err; } + if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) { + vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno)); + } + // open tsdb if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));