diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ac6ce82ffb..a05cd907b0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -873,7 +873,8 @@ int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); typedef struct { - int32_t timestamp; + int64_t timestamp; // unit: millisecond + int32_t maxSpeed; // 0 no limit, unit: bit/s } SVTrimDbReq; int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0990815024..8c4c552db2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2706,7 +2706,8 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; + if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1; + if (tEncodeI32(&encoder, pReq->maxSpeed) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2719,7 +2720,8 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->maxSpeed) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 39ec3484db..ccc4c8293a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1389,7 +1389,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()}; + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 5}; // TODO: use specified maxSpeed int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -1413,7 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { if (code != 0) { mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); } else { - mDebug("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp); + mDebug("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64, pVgroup->vgId, trimReq.timestamp); } sdbRelease(pSdb, pVgroup); } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 817089f237..66df364b4b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -58,6 +58,7 @@ typedef struct SDelFWriter SDelFWriter; typedef struct SDelFReader SDelFReader; typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; +typedef struct STsdbTrimHdl STsdbTrimHdl; typedef struct SRowMerger SRowMerger; typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; @@ -243,6 +244,7 @@ int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS); int32_t tDFileSetCmprFn(const void *p1, const void *p2); +int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid); int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); @@ -318,6 +320,12 @@ struct STsdbFS { SArray *aDFileSet; // SArray }; +struct STsdbTrimHdl { + volatile int8_t state; // 0 idle 1 in use + volatile int32_t maxRetentFid; + volatile int32_t minCommitFid; +}; + struct STsdb { char *path; SVnode *pVnode; @@ -326,6 +334,7 @@ struct STsdb { SMemTable *mem; SMemTable *imem; STsdbFS fs; + STsdbTrimHdl trimHdl; SLRUCache *lruCache; TdThreadMutex lruMutex; }; @@ -559,6 +568,9 @@ struct SDFileSet { SSttFile *aSttF[TSDB_MAX_STT_TRIGGER]; }; +#define SET_DFSET_EXPIRED(d) ((d)->diskId.id = -1) +#define IS_DFSET_EXPIRED(d) ((d)->diskId.id == -1) + struct SRowIter { TSDBROW *pRow; STSchema *pTSchema; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 54dcb66c21..d9acb6da21 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -58,7 +58,7 @@ typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; -typedef struct STrimDbHandle STrimDbHandle; +typedef struct SVTrimDbHdl SVTrimDbHdl; typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; @@ -145,7 +145,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); +int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int32_t maxSpeed); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, @@ -200,7 +200,7 @@ int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); -int32_t smaDoRetention(SSma* pSma, int64_t now); +int32_t smaDoRetention(SSma* pSma, int64_t now, int32_t maxSpeed); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); @@ -302,7 +302,7 @@ struct STsdbKeepCfg { int32_t keep1; int32_t keep2; }; -struct STrimDbHandle { +struct SVTrimDbHdl { volatile int8_t state; // 0 not in trim, 1 in trim }; @@ -329,7 +329,7 @@ struct SVnode { bool restored; tsem_t syncSem; SQHandle* pQuery; - STrimDbHandle trimDbH; + SVTrimDbHdl trimDbH; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c6ec31cc21..d6c3e774ce 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -661,9 +661,10 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { * * @param pSma * @param now + * @param maxSpeed * @return int32_t */ -int32_t smaDoRetention(SSma *pSma, int64_t now) { +int32_t smaDoRetention(SSma *pSma, int64_t now, int32_t maxSpeed) { int32_t code = TSDB_CODE_SUCCESS; if (!VND_IS_RSMA(pSma->pVnode)) { return code; @@ -671,7 +672,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now, maxSpeed); if (code) goto _end; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4..0a2d37bdbc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -756,6 +756,30 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + + if (pTsdb->imem->nRow > 0) { + int32_t minCommitFid = tsdbKeyFid(pTsdb->imem->minKey, pCommitter->minutes, pCommitter->precision); + int32_t nLoops = 0; + + _wait_retention_end: + while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { + if (++nLoops > 1000) { + nLoops = 0; + sched_yield(); + } + } + if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { + if (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { + atomic_store_8(&pTsdb->trimHdl.state, 0); + goto _wait_retention_end; + } + atomic_store_32(&pTsdb->trimHdl.minCommitFid, minCommitFid); + atomic_store_8(&pTsdb->trimHdl.state, 0); + } else { + goto _wait_retention_end; + } + } + code = tsdbFSCopy(pTsdb, &pCommitter->fs); if (code) goto _err; @@ -962,14 +986,23 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; + bool inTrim = atomic_load_8(&pTsdb->pVnode->trimDbH.state); ASSERT(eno == 0); + if(inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock); + + int64_t startTime = taosGetTimestampMs(); code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - if (code) goto _err; + if (code) { + if(inTrim) taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + int64_t endTime = taosGetTimestampMs(); + tsdbInfo("vgId:%d, tsdb end commit - commit1 fsSize:%d cost: %" PRIi64 " ms", TD_VID(pTsdb->pVnode), (int32_t)taosArrayGetSize(pCommitter->fs.aDFileSet), endTime - startTime); // lock - taosThreadRwlockWrlock(&pTsdb->rwLock); + if(!inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock); // commit or rollback code = tsdbFSCommit2(pTsdb, &pCommitter->fs); @@ -977,6 +1010,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; } + tsdbInfo("vgId:%d, tsdb end commit - commit2 cost: %" PRIi64 " ms", TD_VID(pTsdb->pVnode), taosGetTimestampMs() - endTime); pTsdb->imem = NULL; @@ -986,6 +1020,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { tsdbUnrefMemTable(pMemTable); tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); + atomic_store_32(&pTsdb->trimHdl.minCommitFid, INT32_MAX); // if (pCommitter->toMerge) { // code = tsdbMerge(pTsdb); @@ -996,6 +1031,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { return code; _err: + atomic_store_32(&pTsdb->trimHdl.minCommitFid, INT32_MAX); tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index bd3ebfdcd4..45d28f4135 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -711,7 +711,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { _exit: return code; } - +#if 0 int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t code = 0; char tfname[TSDB_FILENAME_LEN]; @@ -1045,6 +1045,447 @@ _err: return code; } +#endif + +#if 1 + +/** + * @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew. + * + * @param pTsdb + * @param pFS + * @param pFSNew + * @param maxFid + * @return int32_t + */ +int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid) { + int32_t code = 0; + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN]; + + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pFS->aDFileSet); + int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); + SDFileSet fSet; + int8_t sameDisk; + + if (iOld >= nOld && iNew >= nNew) break; + + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pFS->aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; + + if (pSetNew && (pSetNew->fid > maxFid)) break; + + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + if (IS_DFSET_EXPIRED(pSetNew)) goto _remove_old; + goto _merge_migrate; + } else if (pSetOld->fid < pSetNew->fid) { + ++iOld; + } else { + ++iNew; + } + continue; + } else { + break; + } + + _merge_migrate: + sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + + ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID); + ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size); + ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset); + ASSERT(!sameDisk); + + // head + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; + + // data + ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size); + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; + + // sma + ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size); + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; + + // stt + ASSERT(pSetOld->nSttF == pSetNew->nSttF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { + ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); + + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } + + // set diskId + pSetOld->diskId = pSetNew->diskId; + + iOld++; + iNew++; + continue; + + _remove_old: + taosMemoryFree(pSetOld->pHeadF); + taosMemoryFree(pSetOld->pDataF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { + taosMemoryFree(pSetOld->aSttF[iStt]); + } + taosMemoryFree(pSetOld->pSmaF); + taosArrayRemove(pFS->aDFileSet, iOld); + iNew++; + continue; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb fs upd/del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + + +int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { + int32_t code = 0; + char tfname[TSDB_FILENAME_LEN]; + char fname[TSDB_FILENAME_LEN]; + + snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + + // gnrt CURRENT.t + code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname); + if (code) goto _err; + + // rename + code = taosRenameFile(tfname, fname); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { + int32_t code = 0; + int32_t nRef; + char fname[TSDB_FILENAME_LEN]; + + // del + if (pFSNew->pDelFile) { + SDelFile *pDelFile = pTsdb->fs.pDelFile; + + if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) { + pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); + if (pTsdb->fs.pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + *pTsdb->fs.pDelFile = *pFSNew->pDelFile; + pTsdb->fs.pDelFile->nRef = 1; + + if (pDelFile) { + nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); + if (nRef == 0) { + tsdbDelFileName(pTsdb, pDelFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pDelFile); + } + } + } + } else { + ASSERT(pTsdb->fs.pDelFile == NULL); + } + + // data + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); + int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); + SDFileSet fSet; + int8_t sameDisk; + + if (iOld >= nOld && iNew >= nNew) break; + + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; + + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + goto _merge_old_and_new; + } else if (pSetOld->fid < pSetNew->fid) { + goto _remove_old; + } else { + goto _add_new; + } + } else if (pSetOld) { + goto _remove_old; + } else { + goto _add_new; + } + + _merge_old_and_new: + sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + + // head + fSet.pHeadF = pSetOld->pHeadF; + if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) { + pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetOld->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; + + nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname); + taosRemoveFile(fname); + taosMemoryFree(fSet.pHeadF); + } + } else { + ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size); + ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset); + } + + // data + fSet.pDataF = pSetOld->pDataF; + if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) { + pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetOld->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; + + nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname); + taosRemoveFile(fname); + taosMemoryFree(fSet.pDataF); + } + } else { + ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size); + pSetOld->pDataF->size = pSetNew->pDataF->size; + } + + // sma + fSet.pSmaF = pSetOld->pSmaF; + if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) { + pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetOld->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; + + nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname); + taosRemoveFile(fname); + taosMemoryFree(fSet.pSmaF); + } + } else { + ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size); + pSetOld->pSmaF->size = pSetNew->pSmaF->size; + } + + // stt + if (sameDisk) { + if (pSetNew->nSttF > pSetOld->nSttF) { + ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1); + pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; + pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; + pSetOld->nSttF++; + } else if (pSetNew->nSttF < pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + pSetOld->aSttF[iStt] = NULL; + } + + pSetOld->nSttF = 1; + pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; + pSetOld->aSttF[0]->nRef = 1; + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } else { + ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); + } + } + } + } else { + ASSERT(pSetOld->nSttF == pSetNew->nSttF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } + } + + if (!sameDisk) { + pSetOld->diskId = pSetNew->diskId; + } + + iOld++; + iNew++; + continue; + + _remove_old: + nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSetOld->pHeadF); + } + + nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSetOld->pDataF); + } + + nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSetOld->pSmaF); + } + + for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname); + taosRemoveFile(fname); + taosMemoryFree(pSetOld->aSttF[iStt]); + } + } + + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + continue; + + _add_new: + fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1}; + + // head + fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (fSet.pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *fSet.pHeadF = *pSetNew->pHeadF; + fSet.pHeadF->nRef = 1; + + // data + fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (fSet.pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *fSet.pDataF = *pSetNew->pDataF; + fSet.pDataF->nRef = 1; + + // sma + fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (fSet.pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *fSet.pSmaF = *pSetNew->pSmaF; + fSet.pSmaF->nRef = 1; + + // stt + ASSERT(pSetNew->nSttF == 1); + fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (fSet.aSttF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + *fSet.aSttF[0] = *pSetNew->aSttF[0]; + fSet.aSttF[0]->nRef = 1; + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + iOld++; + iNew++; + continue; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +#endif + int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; int32_t nRef; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index ec760e3c57..71d745972e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -71,6 +71,9 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } + pTsdb->trimHdl.maxRetentFid = INT32_MIN; + pTsdb->trimHdl.minCommitFid = INT32_MAX; + tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c index 4bdae09d88..61ed29368a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -15,16 +15,18 @@ #include "tsdb.h" -static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { - if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { - return false; - } +enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); - if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { - return true; - } +#define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB +#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB +#define MIGRATE_MIN_COST (5) // second +static bool tsdbShouldDoMigrate(STsdb *pTsdb); +static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); +static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention); +static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention); + +static bool tsdbShouldDoMigrate(STsdb *pTsdb) { if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { return false; } @@ -33,67 +35,203 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) { return false; } + return true; +} - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { +static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + int32_t retention = RETENTION_NO; + if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { + return retention; + } + + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); + if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { + retention |= RETENTION_EXPIRED; + } + + if (!tsdbShouldDoMigrate(pTsdb)) { + return retention; + } + + for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); ++iSet) { pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, keepCfg, now); - SDiskID did; + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); if (expLevel == pSet->diskId.level) continue; - if (expLevel < 0) { - return true; - } else { - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - return false; - } - - if (did.level == pSet->diskId.level) continue; - - return true; + if (expLevel > 0) { + retention |= RETENTION_MIGRATE; + break; } } - return false; + return retention; } -/** - * @brief Data migration between multi-tier storage, including remove expired data. - * 1) firstly, remove expired DFileSet; - * 2) partition the tsdbFS by the expLevel and fileSize(e.g. 500G, configurable), and migrate DFileSet groups between multi-tier storage; - * 3) update the tsdbFS and CURRENT in the same transaction; - * 4) finish - * @param pTsdb - * @param now - * @return int32_t - */ -int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { +static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention) { int32_t code = 0; + int32_t nLoops = 0; + int32_t maxFid = INT32_MIN; + STsdbFS fs = {0}; + STsdbFS fsLatest = {0}; - if (!tsdbShouldDoRetention(pTsdb, now)) { - return code; + if (!(retention & RETENTION_EXPIRED)) { + goto _exit; } - // do retention - STsdbFS fs; - code = tsdbFSCopy(pTsdb, &fs); - if (code) goto _err; + if (code) goto _exit; - for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { + int32_t fsSize = taosArrayGetSize(fs.aDFileSet); + for (int32_t iSet = 0; iSet < fsSize; iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDiskID did; if (expLevel < 0) { - taosMemoryFree(pSet->pHeadF); - taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->aSttF[0]); - taosMemoryFree(pSet->pSmaF); - taosArrayRemove(fs.aDFileSet, iSet); - iSet--; + SET_DFSET_EXPIRED(pSet); + if (pSet->fid > maxFid) maxFid = pSet->fid; } else { - if (expLevel == 0) continue; + break; + } + } + + if (maxFid == INT32_MIN) goto _exit; + +_wait_commit_end: + while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + if (++nLoops > 1000) { + nLoops = 0; + sched_yield(); + printf("%s:%d sche_yield() minCommitFid:%d maxFid:%d\n", __func__, __LINE__, pTsdb->trimHdl.minCommitFid, maxFid); + } + } + if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { + if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + atomic_store_8(&pTsdb->trimHdl.state, 0); + goto _wait_commit_end; + } + atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); + atomic_store_8(&pTsdb->trimHdl.state, 0); + } else { + goto _wait_commit_end; + } + +_merge_fs: + taosThreadRwlockWrlock(&pTsdb->rwLock); + if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + // 1) merge tsdbFSNew and pTsdb->fs + if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + // 2) save CURRENT + if ((code = tsdbFSCommit1(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + // 3) apply the tsdbFS to pTsdb->fs + if ((code = tsdbFSCommit2(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + taosThreadRwlockUnlock(&pTsdb->rwLock); + +_exit: + tsdbFSDestroy(&fs); + tsdbFSDestroy(&fsLatest); + if (code != 0) { + tsdbError("vgId:%d, tsdb do retention(expire) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + ASSERT(0); + } + return code; +} + +/** + * @brief + * + * @param pTsdb + * @param now + * @param retention + * @return int32_t + */ +static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention) { + int32_t code = 0; + int32_t nLoops = 0; + int32_t maxFid = INT32_MIN; + int64_t fSize = 0; + STsdbFS fs = {0}; + STsdbFS fsLatest = {0}; + + if (!(retention & RETENTION_MIGRATE)) { + goto _exit; + } + +_migrate_loop: + // reset + maxFid = INT32_MIN; + fSize = 0; + + code = tsdbFSCopy(pTsdb, &fs); + if (code) goto _exit; + + int32_t fsSize = taosArrayGetSize(fs.aDFileSet); + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (expLevel > 0) { + ASSERT(pSet->fid > maxFid); + maxFid = pSet->fid; + fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); + if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { + break; + } + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + fSize += pSet->aSttF[iStt]->size; + } + if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { + break; + } + } + } + + if (maxFid == INT32_MIN) goto _exit; + +_wait_commit_end: + while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + if (++nLoops > 1000) { + nLoops = 0; + sched_yield(); + printf("%s:%d sche_yield()\n", __func__, __LINE__); + } + } + if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { + if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + atomic_store_8(&pTsdb->trimHdl.state, 0); + goto _wait_commit_end; + } + atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); + atomic_store_8(&pTsdb->trimHdl.state, 0); + } else { + goto _wait_commit_end; + } + + // migrate + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (pSet->fid > maxFid) break; + + if (expLevel < 0) { + SET_DFSET_EXPIRED(pSet); + } else if (expLevel > 0) { if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { code = terrno; goto _exit; @@ -101,40 +239,87 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (did.level == pSet->diskId.level) continue; - // copy file to new disk (todo) + // copy file to new disk SDFileSet fSet = *pSet; fSet.diskId = did; code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); - if (code) goto _err; + if (code) goto _exit; code = tsdbFSUpsertFSet(&fs, &fSet); - if (code) goto _err; + if (code) goto _exit; } } - // do change fs - code = tsdbFSCommit1(pTsdb, &fs); - if (code) goto _err; - +_merge_fs: taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit2(pTsdb, &fs); - if (code) { + if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; + goto _exit; + } + // 1) merge tsdbFSNew and pTsdb->fs + if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + // 2) save CURRENT + if ((code = tsdbFSCommit1(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + // 3) apply the tsdbFS to pTsdb->fs + if ((code = tsdbFSCommit2(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; } - taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbFSDestroy(&fs); _exit: + if (code != 0) { + tsdbError("vgId:%d, tsdb do retention(migrate) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + ASSERT(0); + } return code; +} -_err: - tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - ASSERT(0); - // tsdbFSRollback(pTsdb->pFS); +/** + * @brief Data migration between multi-tier storage, including remove expired data. + * 1) firstly, remove expired DFileSet; + * 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate + * DFileSet groups between multi-tier storage; + * 3) update the tsdbFS and CURRENT in the same transaction; + * 4) finish the migration + * @param pTsdb + * @param now + * @param maxSpeed + * @return int32_t + */ +int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int32_t maxSpeed) { + int32_t code = 0; + int32_t retention = RETENTION_NO; + + retention = tsdbShouldDoRetention(pTsdb, now); + if (retention == RETENTION_NO) { + goto _exit; + } + + // step 1: process expire + code = tsdbProcessExpire(pTsdb, now, retention); + if (code < 0) goto _exit; + + // step 2: process multi-tier migration + code = tsdbProcessMigrate(pTsdb, now, maxSpeed, retention); + if (code < 0) goto _exit; + +_exit: + pTsdb->trimHdl.maxRetentFid = INT32_MIN; + if (code != 0) { + tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + ASSERT(0); + // tsdbFSRollback(pTsdb->pFS); + } return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 728b5b4cda..0263740e4a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -417,24 +417,29 @@ void *vnodeProcessTrimReqFunc(void *param) { setThreadName("vnode-trim"); // process - code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp); + code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); if (code) goto _exit; - code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp); + code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); if (code) goto _exit; - _exit: - vInfo("vgId:%d, trim vnode thread finished, time:%d", TD_VID(pVnode), pReq->trimReq.timestamp); oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0); ASSERT(oldVal == 1); taosMemoryFree(pReq); + if (code) { + vError("vgId:%d, trim vnode thread failed since %s, time:%" PRIi64, TD_VID(pVnode), tstrerror(code), + pReq->trimReq.timestamp); + } else { + vInfo("vgId:%d, trim vnode thread finished, time:%" PRIi64, TD_VID(pVnode), pReq->trimReq.timestamp); + } + return NULL; } static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq)); - STrimDbHandle *pHandle = &pVnode->trimDbH; + SVTrimDbHdl *pHandle = &pVnode->trimDbH; if (!pVndTrimReq) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -450,7 +455,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, } if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) { - vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%d", TD_VID(pVnode), + vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%" PRIi64, TD_VID(pVnode), pVndTrimReq->trimReq.timestamp); taosMemoryFree(pVndTrimReq); goto _exit;