Merge pull request #21973 from luckeverda/fix/TD-25074-main
fix/TD-25074 for main
This commit is contained in:
commit
0865b1a828
|
@ -79,16 +79,18 @@ typedef struct {
|
||||||
TXN* pTxn;
|
TXN* pTxn;
|
||||||
} STtlDelTtlCtx;
|
} STtlDelTtlCtx;
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
||||||
int ttlMgrClose(STtlManger* pTtlMgr);
|
void ttlMgrClose(STtlManger* pTtlMgr);
|
||||||
int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta);
|
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta);
|
bool ttlMgrNeedUpgrade(TDB* pEnv);
|
||||||
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
||||||
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
||||||
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
||||||
|
|
||||||
|
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
||||||
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
|
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore;
|
||||||
#define META_BEGIN_HEAP_NIL 2
|
#define META_BEGIN_HEAP_NIL 2
|
||||||
|
|
||||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
||||||
|
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
|
||||||
int metaClose(SMeta** pMeta);
|
int metaClose(SMeta** pMeta);
|
||||||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||||
TXN* metaGetTxn(SMeta* pMeta);
|
TXN* metaGetTxn(SMeta* pMeta);
|
||||||
|
|
|
@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdbCommit(pMeta->pEnv, pMeta->txn);
|
tdbCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen
|
||||||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||||
|
|
||||||
|
static void metaCleanup(SMeta **ppMeta);
|
||||||
|
|
||||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -180,51 +182,43 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
metaCleanup(&pMeta);
|
||||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
|
||||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
|
||||||
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
|
||||||
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
|
||||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
|
||||||
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
|
||||||
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
|
||||||
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
|
||||||
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
|
||||||
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
|
||||||
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
|
||||||
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
|
||||||
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
|
||||||
metaDestroyLock(pMeta);
|
|
||||||
taosMemoryFree(pMeta);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaClose(SMeta **ppMeta) {
|
int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
SMeta *pMeta = *ppMeta;
|
SMeta *pMeta = *ppMeta;
|
||||||
if (pMeta) {
|
|
||||||
if (pMeta->pEnv) metaAbort(pMeta);
|
|
||||||
if (pMeta->pCache) metaCacheClose(pMeta);
|
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
|
||||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
|
||||||
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
|
||||||
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
|
||||||
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
|
||||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
|
||||||
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
|
||||||
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
|
||||||
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
|
||||||
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
|
||||||
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
|
||||||
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
|
||||||
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
|
||||||
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
|
||||||
metaDestroyLock(pMeta);
|
|
||||||
|
|
||||||
taosMemoryFreeClear(*ppMeta);
|
if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
|
||||||
|
code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = metaCommit(pMeta, pMeta->txn);
|
||||||
|
if (code < 0) {
|
||||||
|
metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
metaCleanup(ppMeta);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int metaClose(SMeta **ppMeta) {
|
||||||
|
metaCleanup(ppMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,6 +264,32 @@ int32_t metaULock(SMeta *pMeta) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void metaCleanup(SMeta **ppMeta) {
|
||||||
|
SMeta *pMeta = *ppMeta;
|
||||||
|
if (pMeta) {
|
||||||
|
if (pMeta->pEnv) metaAbort(pMeta);
|
||||||
|
if (pMeta->pCache) metaCacheClose(pMeta);
|
||||||
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
|
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||||
|
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
|
||||||
|
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
|
||||||
|
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||||
|
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
|
||||||
|
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||||
|
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
||||||
|
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
||||||
|
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
|
||||||
|
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
||||||
|
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
||||||
|
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
|
||||||
|
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
|
||||||
|
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
|
||||||
|
metaDestroyLock(pMeta);
|
||||||
|
|
||||||
|
taosMemoryFreeClear(*ppMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||||
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
|
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
|
||||||
STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2;
|
STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2;
|
||||||
|
|
|
@ -21,6 +21,10 @@ typedef struct {
|
||||||
SMeta *pMeta;
|
SMeta *pMeta;
|
||||||
} SConvertData;
|
} SConvertData;
|
||||||
|
|
||||||
|
static void ttlMgrCleanup(STtlManger *pTtlMgr);
|
||||||
|
|
||||||
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
|
||||||
|
|
||||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
|
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
|
||||||
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
|
@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx";
|
||||||
const char *ttlV1Tbname = "ttlv1.idx";
|
const char *ttlV1Tbname = "ttlv1.idx";
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
int ret;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
*ppTtlMgr = NULL;
|
*ppTtlMgr = NULL;
|
||||||
|
|
||||||
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
||||||
if (pTtlMgr == NULL) {
|
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbExist(ttlTbname, pEnv)) {
|
|
||||||
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback);
|
|
||||||
if (ret < 0) {
|
|
||||||
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
||||||
|
|
||||||
tdbOsFree(pTtlMgr);
|
tdbOsFree(pTtlMgr);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
|
|
||||||
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
|
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
|
||||||
|
|
||||||
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("failed to fill hash since %s", tstrerror(terrno));
|
||||||
|
ttlMgrCleanup(pTtlMgr);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t endNs = taosGetTimestampNs();
|
||||||
|
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||||
|
endNs - startNs);
|
||||||
|
|
||||||
*ppTtlMgr = pTtlMgr;
|
*ppTtlMgr = pTtlMgr;
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrClose(STtlManger *pTtlMgr) {
|
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
|
||||||
taosHashCleanup(pTtlMgr->pTtlCache);
|
|
||||||
taosHashCleanup(pTtlMgr->pDirtyUids);
|
bool ttlMgrNeedUpgrade(TDB *pEnv) {
|
||||||
tdbTbClose(pTtlMgr->pTtlIdx);
|
bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
|
||||||
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
if (needUpgrade) {
|
||||||
tdbOsFree(pTtlMgr);
|
metaInfo("find ttl idx in old version , will convert");
|
||||||
return 0;
|
}
|
||||||
|
return needUpgrade;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
|
int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
metaInfo("ttl mgr start open");
|
SMeta *meta = (SMeta *)pMeta;
|
||||||
int ret;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
metaInfo("ttl mgr start upgrade");
|
||||||
|
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
SMeta *meta = (SMeta *)pMeta;
|
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTtlMgr->pOldTtlIdx) {
|
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
||||||
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
if (ret < 0) {
|
||||||
if (ret < 0) {
|
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
||||||
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
goto _out;
|
||||||
goto _out;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
|
||||||
|
|
||||||
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
|
||||||
pTtlMgr->pOldTtlIdx = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
|
@ -111,13 +120,23 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endNs = taosGetTimestampNs();
|
int64_t endNs = taosGetTimestampNs();
|
||||||
|
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||||
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
|
||||||
endNs - startNs);
|
endNs - startNs);
|
||||||
_out:
|
_out:
|
||||||
|
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
||||||
|
pTtlMgr->pOldTtlIdx = NULL;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
||||||
|
taosHashCleanup(pTtlMgr->pTtlCache);
|
||||||
|
taosHashCleanup(pTtlMgr->pDirtyUids);
|
||||||
|
tdbTbClose(pTtlMgr->pTtlIdx);
|
||||||
|
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
||||||
|
tdbOsFree(pTtlMgr);
|
||||||
|
}
|
||||||
|
|
||||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
||||||
if (ttlDays <= 0) return;
|
if (ttlDays <= 0) return;
|
||||||
|
|
||||||
|
@ -205,7 +224,7 @@ _out:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||||
SMeta *meta = pMeta;
|
SMeta *meta = pMeta;
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl start.");
|
metaInfo("ttlMgr convert ttl start.");
|
||||||
|
|
|
@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncCfg *pCfg = &info.config.syncCfg;
|
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||||
|
|
||||||
pCfg->replicaNum = 0;
|
pCfg->replicaNum = 0;
|
||||||
pCfg->totalReplicaNum = 0;
|
pCfg->totalReplicaNum = 0;
|
||||||
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
|
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
|
||||||
|
@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
|
||||||
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
|
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
|
||||||
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
|
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
|
||||||
|
|
||||||
info.config.syncCfg = *pCfg;
|
info.config.syncCfg = *pCfg;
|
||||||
|
@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
|
||||||
|
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
// open tsdb
|
// open tsdb
|
||||||
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
|
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
|
||||||
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
|
Loading…
Reference in New Issue