diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7aec00c7c1..949446cc33 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -873,7 +873,8 @@ int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); typedef struct { - int32_t timestamp; + int64_t timestamp; // unit: millisecond + int64_t maxSpeed; // 0 no limit, unit: Byte/s } SVTrimDbReq; int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f4ffc4c996..04aed7ca36 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2708,7 +2708,8 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; + if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1; + if (tEncodeI64(&encoder, pReq->maxSpeed) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2721,7 +2722,8 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->maxSpeed) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a05d8dd739..cb47e9dac1 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1385,11 +1385,19 @@ _OVER: return code; } -static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { +/** + * @brief trim database + * + * @param pMnode + * @param pDb + * @param maxSpeed MB/s + * @return int32_t + */ +static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()}; + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = maxSpeed << 20}; int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -1413,7 +1421,8 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { if (code != 0) { mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); } else { - mInfo("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp); + mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64 ", max speed:%" PRIi64, pVgroup->vgId, + trimReq.timestamp, trimReq.maxSpeed); } sdbRelease(pSdb, pVgroup); } @@ -1443,7 +1452,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndTrimDb(pMnode, pDb); + code = mndTrimDb(pMnode, pDb, trimReq.maxSpeed); _OVER: if (code != 0) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 916311bbee..180be0ff6e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,6 +64,7 @@ typedef struct SDelFWriter SDelFWriter; typedef struct SDelFReader SDelFReader; typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; +typedef struct STsdbTrimHdl STsdbTrimHdl; typedef struct SRowMerger SRowMerger; typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; @@ -248,6 +249,7 @@ int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS); int32_t tDFileSetCmprFn(const void *p1, const void *p2); +int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid); int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); @@ -268,7 +270,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); @@ -320,10 +322,18 @@ int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSc // structs ======================= struct STsdbFS { + int64_t version; SDelFile *pDelFile; SArray *aDFileSet; // SArray }; +struct STsdbTrimHdl { + volatile int8_t state; // 0 idle 1 in use + volatile int8_t commitInWait; // 0 not in wait, 1 in wait + volatile int32_t maxRetentFid; + volatile int32_t minCommitFid; +}; + struct STsdb { char *path; SVnode *pVnode; @@ -332,6 +342,7 @@ struct STsdb { SMemTable *mem; SMemTable *imem; STsdbFS fs; + STsdbTrimHdl trimHdl; SLRUCache *lruCache; TdThreadMutex lruMutex; }; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4c8045d651..8757dc88e6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -58,6 +58,7 @@ typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; +typedef struct SVTrimDbHdl SVTrimDbHdl; typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; @@ -144,7 +145,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); +int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int64_t maxSpeed); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, @@ -199,7 +200,7 @@ int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); -int32_t smaDoRetention(SSma* pSma, int64_t now); +int32_t smaDoRetention(SSma* pSma, int64_t now, int64_t maxSpeed); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); @@ -301,6 +302,9 @@ struct STsdbKeepCfg { int32_t keep1; int32_t keep2; }; +struct SVTrimDbHdl { + volatile int8_t state; // 0 not in trim, 1 in trim +}; struct SVnode { char* path; @@ -325,6 +329,7 @@ struct SVnode { bool restored; tsem_t syncSem; SQHandle* pQuery; + SVTrimDbHdl trimDbH; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8d1525e081..e149abbace 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -661,9 +661,10 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { * * @param pSma * @param now + * @param maxSpeed * @return int32_t */ -int32_t smaDoRetention(SSma *pSma, int64_t now) { +int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) { int32_t code = TSDB_CODE_SUCCESS; if (!VND_IS_RSMA(pSma->pVnode)) { return code; @@ -671,7 +672,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now, maxSpeed); if (code) goto _end; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 5403395623..0bfb90a374 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -756,6 +756,32 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + + if (pTsdb->imem->nRow > 0) { + int32_t minCommitFid = tsdbKeyFid(pTsdb->imem->minKey, pCommitter->minutes, pCommitter->precision); + int32_t nLoops = 0; + + _wait_retention_end: + while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { + atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 0, 1); + if (++nLoops > 1000) { + nLoops = 0; + sched_yield(); + } + } + if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { + if (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { + atomic_store_8(&pTsdb->trimHdl.state, 0); + goto _wait_retention_end; + } + atomic_store_32(&pTsdb->trimHdl.minCommitFid, minCommitFid); + atomic_store_8(&pTsdb->trimHdl.state, 0); + } else { + goto _wait_retention_end; + } + atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 1, 0); + } + code = tsdbFSCopy(pTsdb, &pCommitter->fs); if (code) goto _err; @@ -962,20 +988,38 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; + STsdbFS fsLatest = {0}; ASSERT(eno == 0); - code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - if (code) goto _err; - // lock taosThreadRwlockWrlock(&pTsdb->rwLock); + + ASSERT(pCommitter->fs.version <= pTsdb->fs.version); + + if (pCommitter->fs.version < pTsdb->fs.version) { + if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + + if ((code = tsdbFSUpdDel(pTsdb, &pCommitter->fs, &fsLatest, pTsdb->trimHdl.minCommitFid - 1))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + } + + code = tsdbFSCommit1(pTsdb, &pCommitter->fs); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } // commit or rollback code = tsdbFSCommit2(pTsdb, &pCommitter->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; + goto _exit; } pTsdb->imem = NULL; @@ -983,20 +1027,23 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); +_exit: tsdbUnrefMemTable(pMemTable); tsdbFSDestroy(&pCommitter->fs); + tsdbFSDestroy(&fsLatest); taosArrayDestroy(pCommitter->aTbDataP); + atomic_store_32(&pTsdb->trimHdl.minCommitFid, INT32_MAX); // if (pCommitter->toMerge) { // code = tsdbMerge(pTsdb); // if (code) goto _err; // } - - tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); - return code; - -_err: - tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if(code == 0) { + tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); + } else { + tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + } + return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 10926ae6ad..ef4f6930be 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -250,7 +250,7 @@ _err: void tsdbFSDestroy(STsdbFS *pFS) { if (pFS->pDelFile) { - taosMemoryFree(pFS->pDelFile); + taosMemoryFreeClear(pFS->pDelFile); } for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) { @@ -263,7 +263,7 @@ void tsdbFSDestroy(STsdbFS *pFS) { } } - taosArrayDestroy(pFS->aDFileSet); + pFS->aDFileSet = taosArrayDestroy(pFS->aDFileSet); } static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { @@ -419,6 +419,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t code = 0; // open handle + pTsdb->fs.version = 0; pTsdb->fs.pDelFile = NULL; pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); if (pTsdb->fs.aDFileSet == NULL) { @@ -534,6 +535,7 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; + pFS->version = pTsdb->fs.version; pFS->pDelFile = NULL; pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); if (pFS->aDFileSet == NULL) { @@ -664,6 +666,9 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } } + // update the diskId + pDFileSet->diskId = pSet->diskId; + goto _exit; } } @@ -712,6 +717,108 @@ _exit: return code; } +/** + * @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew. + * + * @param pTsdb + * @param pFS + * @param pFSNew + * @param maxFid + * @return int32_t + */ +int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid) { + int32_t code = 0; + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN]; + + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pFS->aDFileSet); + int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); + SDFileSet fSet; + int8_t sameDisk; + + if (iOld >= nOld && iNew >= nNew) break; + + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pFS->aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; + + if (pSetNew && (pSetNew->fid > maxFid)) break; + + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + goto _merge_migrate; + } else if (pSetOld->fid > pSetNew->fid) { + goto _remove_old; + } else { + ++iOld; + ASSERT(0); + } + continue; + } else { + break; + } + + _merge_migrate: + sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + + ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID); + ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size); + ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset); + + if (!sameDisk) { + // head + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; + + // data + ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size); + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; + + // sma + ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size); + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; + + // stt + ASSERT(pSetOld->nSttF == pSetNew->nSttF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { + ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); + + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } + + // set diskId + pSetOld->diskId = pSetNew->diskId; + } + + ++iOld; + ++iNew; + continue; + + _remove_old: + taosMemoryFree(pSetOld->pHeadF); + taosMemoryFree(pSetOld->pDataF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { + taosMemoryFree(pSetOld->aSttF[iStt]); + } + taosMemoryFree(pSetOld->pSmaF); + taosArrayRemove(pFS->aDFileSet, iOld); + ++iNew; + continue; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb fs upd/del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t code = 0; char tfname[TSDB_FILENAME_LEN]; @@ -745,6 +852,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t nRef; char fname[TSDB_FILENAME_LEN]; + ++pTsdb->fs.version; + // del if (pFSNew->pDelFile) { SDelFile *pDelFile = pTsdb->fs.pDelFile; @@ -921,8 +1030,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; pSetOld->aSttF[iStt]->nRef = 1; } else { - ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); + ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index ec760e3c57..8ce61d49b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -71,6 +71,10 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } + pTsdb->trimHdl.maxRetentFid = INT32_MIN; + pTsdb->trimHdl.minCommitFid = INT32_MAX; + pTsdb->trimHdl.commitInWait = 0; + tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index fc577e3962..282640b52c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,7 +607,66 @@ _err: return code; } -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { +/** + * @brief send file with limited speed(rough control) + * + * @param pTsdb + * @param pFileOut + * @param pFileIn + * @param size + * @param speed 0 no limit, unit: B/s + * @return int64_t + */ +static int64_t tsdbFSendFile(STsdb *pTsdb, TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int64_t speed) { + if (speed <= 0) { + return taosFSendFile(pOutFD, pInFD, 0, size); + } + + int64_t offset = 0; + int64_t tBytes = 0; + int64_t nBytes = 0; + int64_t startMs = 0; + int64_t cost = 0; + + while ((offset + nBytes) < size) { + if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { + tsdbDebug("vgId:%d sendFile without limit since conflicts, fSize:%" PRIi64 ", maxSpeed:%" PRIi64, + TD_VID(pTsdb->pVnode), size, speed); + goto _send_remain; + } + startMs = taosGetTimestampMs(); + if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed)) < 0) { + return nBytes; + } + cost = taosGetTimestampMs() - startMs; + tBytes += nBytes; + + int64_t nSleep = 0; + if (cost < 0) { + nSleep = 1000; + } else if (cost < 1000) { + nSleep = 1000 - cost; + } + if (nSleep > 0) { + taosMsleep(nSleep); + tsdbDebug("vgId:%d sendFile and msleep:%" PRIi64 ", fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, + TD_VID(pTsdb->pVnode), nSleep, size, tBytes, speed); + } + } + +_send_remain: + if (offset < size) { + if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) { + return nBytes; + } + tBytes += nBytes; + tsdbDebug("vgId:%d sendFile remain, fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, TD_VID(pTsdb->pVnode), + size, tBytes, speed); + } + return tBytes; +} + +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed) { int32_t code = 0; int64_t n; int64_t size; @@ -620,7 +679,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { // head tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo); - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -630,7 +689,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage)); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -641,7 +700,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { // data tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom); tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo); - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -651,7 +710,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage)); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -662,7 +721,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { // sma tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom); tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo); - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -672,7 +731,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage)); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -684,7 +743,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { for (int8_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[iStt], fNameFrom); tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[iStt], fNameTo); - pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -694,7 +753,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2c68c57176..f3890b7b53 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,98 +15,296 @@ #include "tsdb.h" -static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; +enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; + +#define MIGRATE_MAX_SPEED (1048576 << 4) // 16 MB, vnode level +#define MIGRATE_MIN_COST (5) // second + +static bool tsdbShouldDoMigrate(STsdb *pTsdb); +static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); +static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type); + +static bool tsdbShouldDoMigrate(STsdb *pTsdb) { + if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { + return false; + } + + STsdbKeepCfg *keepCfg = &pTsdb->keepCfg; + if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) { + return false; + } + return true; +} + +static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + int32_t retention = RETENTION_NO; + if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { + return retention; + } + + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); + if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { + retention |= RETENTION_EXPIRED; + } + + if (!tsdbShouldDoMigrate(pTsdb)) { + return retention; + } + + for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); ++iSet) { + pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); if (expLevel == pSet->diskId.level) continue; - if (expLevel < 0) { - return true; - } else { - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - return false; - } - - if (did.level == pSet->diskId.level) continue; - - return true; + if (expLevel > 0) { + retention |= RETENTION_MIGRATE; + break; } } - return false; + return retention; } -int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { +/** + * @brief process retention + * + * @param pTsdb + * @param now + * @param maxSpeed + * @param retention + * @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE + * @return int32_t + */ +static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type) { int32_t code = 0; + int32_t nBatch = 0; + int32_t nLoops = 0; + int32_t maxFid = 0; + int64_t fSize = 0; + int64_t speed = maxSpeed > 0 ? maxSpeed : MIGRATE_MAX_SPEED; + STsdbFS fs = {0}; + STsdbFS fsLatest = {0}; - if (!tsdbShouldDoRetention(pTsdb, now)) { - return code; + if (!(retention & type)) { + goto _exit; } - // do retention - STsdbFS fs; +_retention_loop: + // reset + maxFid = INT32_MIN; + fSize = 0; + tsdbFSDestroy(&fs); + tsdbFSDestroy(&fsLatest); + + if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { + atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN); + taosMsleep(50); + } code = tsdbFSCopy(pTsdb, &fs); - if (code) goto _err; + if (code) goto _exit; - for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; + if (type == RETENTION_MIGRATE) { + int32_t fsSize = taosArrayGetSize(fs.aDFileSet); + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; - if (expLevel < 0) { - taosMemoryFree(pSet->pHeadF); - taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->aSttF[0]); - taosMemoryFree(pSet->pSmaF); - taosArrayRemove(fs.aDFileSet, iSet); - iSet--; - } else { - if (expLevel == 0) continue; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - code = terrno; - goto _exit; + if (pSet->diskId.level == expLevel) continue; + + if (expLevel > 0) { + ASSERT(pSet->fid > maxFid); + maxFid = pSet->fid; + fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); + if (fSize / speed > MIGRATE_MIN_COST) { + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); + break; + } + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + fSize += pSet->aSttF[iStt]->size; + } + if (fSize / speed > MIGRATE_MIN_COST) { + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); + break; + } } + } + } else if (type == RETENTION_EXPIRED) { + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; - if (did.level == pSet->diskId.level) continue; - - // copy file to new disk (todo) - SDFileSet fSet = *pSet; - fSet.diskId = did; - - code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); - if (code) goto _err; - - code = tsdbFSUpsertFSet(&fs, &fSet); - if (code) goto _err; + if (expLevel < 0) { + ASSERT(pSet->fid > maxFid); + if (pSet->fid > maxFid) maxFid = pSet->fid; + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->pSmaF); + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + taosMemoryFree(pSet->aSttF[iStt]); + } + taosArrayRemove(fs.aDFileSet, iSet); + --iSet; + } else { + break; + } } } - // do change fs - code = tsdbFSCommit1(pTsdb, &fs); - if (code) goto _err; + if (maxFid == INT32_MIN) goto _exit; - taosThreadRwlockWrlock(&pTsdb->rwLock); - - code = tsdbFSCommit2(pTsdb, &fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; +_commit_conflict_check: + while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + if (++nLoops > 1000) { + nLoops = 0; + sched_yield(); + } + } + if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { + if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { + atomic_store_8(&pTsdb->trimHdl.state, 0); + goto _commit_conflict_check; + } + atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); + atomic_store_8(&pTsdb->trimHdl.state, 0); + } else { + goto _commit_conflict_check; } + // migrate + if (type == RETENTION_MIGRATE) { + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (pSet->fid > maxFid) break; + + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), + nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); + + if (expLevel < 0) { + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->pSmaF); + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + taosMemoryFree(pSet->aSttF[iStt]); + } + taosArrayRemove(fs.aDFileSet, iSet); + --iSet; + } else { + if (expLevel == pSet->diskId.level) continue; + + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + goto _exit; + } + + if (did.level == pSet->diskId.level) continue; + + // copy file to new disk + SDFileSet fSet = *pSet; + fSet.diskId = did; + + code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed); + if (code) goto _exit; + + code = tsdbFSUpsertFSet(&fs, &fSet); + if (code) goto _exit; + } + } + } + +_merge_fs: + taosThreadRwlockWrlock(&pTsdb->rwLock); + + // 1) prepare fs, merge tsdbFSNew and pTsdb->fs if needed + STsdbFS *pTsdbFS = &fs; + ASSERT(fs.version <= pTsdb->fs.version); + + if (fs.version < pTsdb->fs.version) { + if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + + if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + pTsdbFS = &fsLatest; + } + + // 2) save CURRENT + if ((code = tsdbFSCommit1(pTsdb, pTsdbFS))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } + + // 3) apply the tsdbFS to pTsdb->fs + if ((code = tsdbFSCommit2(pTsdb, pTsdbFS))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _exit; + } taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbFSDestroy(&fs); + if (type == RETENTION_MIGRATE) { + ++nBatch; + goto _retention_loop; + } _exit: + tsdbFSDestroy(&fs); + tsdbFSDestroy(&fsLatest); + if (code != 0) { + tsdbError("vgId:%d, tsdb do retention %" PRIi8 " failed since %s", TD_VID(pTsdb->pVnode), type, tstrerror(code)); + ASSERT(0); + } return code; +} -_err: - tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - ASSERT(0); - // tsdbFSRollback(pTsdb->pFS); +/** + * @brief Data migration between multi-tier storage, including remove expired data. + * 1) firstly, remove expired DFileSet; + * 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate + * DFileSet groups between multi-tier storage; + * 3) update the tsdbFS and CURRENT in the same transaction; + * 4) finish the migration + * @param pTsdb + * @param now + * @param maxSpeed + * @return int32_t + */ +int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) { + int32_t code = 0; + int32_t retention = RETENTION_NO; + + retention = tsdbShouldDoRetention(pTsdb, now); + if (retention == RETENTION_NO) { + goto _exit; + } + + // step 1: process expire + code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_EXPIRED); + if (code < 0) goto _exit; + + // step 2: process multi-tier migration + code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_MIGRATE); + if (code < 0) goto _exit; + +_exit: + pTsdb->trimHdl.maxRetentFid = INT32_MIN; + if (code != 0) { + tsdbError("vgId:%d, tsdb do retention %d failed since %s, time:%" PRIi64 ", max speed:%" PRIi64, + TD_VID(pTsdb->pVnode), retention, tstrerror(code), now, maxSpeed); + ASSERT(0); + // tsdbFSRollback(pTsdb->pFS); + } else { + tsdbInfo("vgId:%d, tsdb do retention %d succeed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pTsdb->pVnode), + retention, now, maxSpeed); + } return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 4999e7a49a..5e76e2b4d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -509,16 +509,24 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK *maxKey = *minKey + minutes * tsTickPerMin[precision] - 1; } +/** + * @brief get fid level by keep and days. + * + * @param fid + * @param pKeepCfg + * @param now millisecond + * @return int32_t + */ int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) { int32_t aFid[3]; TSKEY key; if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) { - now = now * 1000; + // now = now * 1000; } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) { - now = now * 1000000l; + now = now * 1000l; } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) { - now = now * 1000000000l; + now = now * 1000000l; } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea4051..481c8b66be 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -186,6 +186,17 @@ void vnodePreClose(SVnode *pVnode) { } } +static void vnodeTrimDbClose(SVnode *pVnode) { + int32_t nLoops = 0; + while (atomic_load_8(&pVnode->trimDbH.state) != 0) { + if (++nLoops > 1000) { + vTrace("vgId:%d, wait for trimDb task to finish", TD_VID(pVnode)); + sched_yield(); + nLoops = 0; + } + } +} + void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeCommit(pVnode); @@ -197,6 +208,7 @@ void vnodeClose(SVnode *pVnode) { smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); + vnodeTrimDbClose(pVnode); // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6e9eba306a..464fd50ab0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -378,6 +378,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } +#if 0 static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -388,7 +389,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, goto _exit; } - vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); + vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, pVnode->config.vgId, trimReq.timestamp); // process code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); @@ -400,6 +401,91 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, _exit: return code; } +#endif + +typedef struct { + SVnode *pVnode; + SVTrimDbReq trimReq; +} SVndTrimDbReq; + +void *vnodeProcessTrimReqFunc(void *param) { + int32_t code = 0; + int8_t oldVal = 0; + SVndTrimDbReq *pReq = (SVndTrimDbReq *)param; + SVnode *pVnode = pReq->pVnode; + + setThreadName("vnode-trim"); + + // process + code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); + if (code) goto _exit; + + code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); + if (code) goto _exit; +_exit: + oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0); + ASSERT(oldVal == 1); + if (code) { + vError("vgId:%d, trim vnode thread failed since %s, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), + tstrerror(code), pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); + } else { + vInfo("vgId:%d, trim vnode thread finish, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), + pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); + } + taosMemoryFree(pReq); + return NULL; +} + +static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + int32_t code = 0; + SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq)); + SVTrimDbHdl *pHandle = &pVnode->trimDbH; + + if (!pVndTrimReq) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pVndTrimReq->pVnode = pVnode; + + if (tDeserializeSVTrimDbReq(pReq, len, &pVndTrimReq->trimReq) != 0) { + taosMemoryFree(pVndTrimReq); + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) { + vInfo("vgId:%d, trim vnode request ignored since duplicated req, time:%" PRIi64 ", max speed:%" PRIi64, + TD_VID(pVnode), pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); + taosMemoryFree(pVndTrimReq); + goto _exit; + } + + vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), + pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); + + TdThreadAttr thAttr = {0}; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); + + TdThread tid; + if (taosThreadCreate(&tid, &thAttr, vnodeProcessTrimReqFunc, (void *)pVndTrimReq) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pVndTrimReq); + taosThreadAttrDestroy(&thAttr); + int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0); + ASSERT(oldVal == 1); + vError("vgId:%d, failed to create pthread to trim vnode since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + vDebug("vgId:%d, success to create pthread to trim vnode", TD_VID(pVnode)); + + taosThreadAttrDestroy(&thAttr); + +_exit: + terrno = code; + return code; +} static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); @@ -921,7 +1007,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true);