diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 949446cc33..7aec00c7c1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -873,8 +873,7 @@ int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); typedef struct { - int64_t timestamp; // unit: millisecond - int64_t maxSpeed; // 0 no limit, unit: Byte/s + int32_t timestamp; } SVTrimDbReq; int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 04aed7ca36..f4ffc4c996 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2708,8 +2708,7 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1; - if (tEncodeI64(&encoder, pReq->maxSpeed) < 0) return -1; + if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2722,8 +2721,7 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->maxSpeed) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index cb47e9dac1..a05d8dd739 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1385,19 +1385,11 @@ _OVER: return code; } -/** - * @brief trim database - * - * @param pMnode - * @param pDb - * @param maxSpeed MB/s - * @return int32_t - */ -static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) { +static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = maxSpeed << 20}; + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()}; int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -1421,8 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) { if (code != 0) { mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); } else { - mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64 ", max speed:%" PRIi64, pVgroup->vgId, - trimReq.timestamp, trimReq.maxSpeed); + mInfo("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp); } sdbRelease(pSdb, pVgroup); } @@ -1452,7 +1443,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndTrimDb(pMnode, pDb, trimReq.maxSpeed); + code = mndTrimDb(pMnode, pDb); _OVER: if (code != 0) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 180be0ff6e..916311bbee 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,7 +64,6 @@ typedef struct SDelFWriter SDelFWriter; typedef struct SDelFReader SDelFReader; typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; -typedef struct STsdbTrimHdl STsdbTrimHdl; typedef struct SRowMerger SRowMerger; typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; @@ -249,7 +248,6 @@ int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS); int32_t tDFileSetCmprFn(const void *p1, const void *p2); -int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid); int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); @@ -270,7 +268,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed); +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); @@ -322,18 +320,10 @@ int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSc // structs ======================= struct STsdbFS { - int64_t version; SDelFile *pDelFile; SArray *aDFileSet; // SArray }; -struct STsdbTrimHdl { - volatile int8_t state; // 0 idle 1 in use - volatile int8_t commitInWait; // 0 not in wait, 1 in wait - volatile int32_t maxRetentFid; - volatile int32_t minCommitFid; -}; - struct STsdb { char *path; SVnode *pVnode; @@ -342,7 +332,6 @@ struct STsdb { SMemTable *mem; SMemTable *imem; STsdbFS fs; - STsdbTrimHdl trimHdl; SLRUCache *lruCache; TdThreadMutex lruMutex; }; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8757dc88e6..4c8045d651 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -58,7 +58,6 @@ typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; -typedef struct SVTrimDbHdl SVTrimDbHdl; typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; @@ -145,7 +144,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int64_t maxSpeed); +int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, @@ -200,7 +199,7 @@ int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); -int32_t smaDoRetention(SSma* pSma, int64_t now, int64_t maxSpeed); +int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); @@ -302,9 +301,6 @@ struct STsdbKeepCfg { int32_t keep1; int32_t keep2; }; -struct SVTrimDbHdl { - volatile int8_t state; // 0 not in trim, 1 in trim -}; struct SVnode { char* path; @@ -329,7 +325,6 @@ struct SVnode { bool restored; tsem_t syncSem; SQHandle* pQuery; - SVTrimDbHdl trimDbH; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e149abbace..8d1525e081 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -661,10 +661,9 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { * * @param pSma * @param now - * @param maxSpeed * @return int32_t */ -int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) { +int32_t smaDoRetention(SSma *pSma, int64_t now) { int32_t code = TSDB_CODE_SUCCESS; if (!VND_IS_RSMA(pSma->pVnode)) { return code; @@ -672,7 +671,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbDoRetention(pSma->pRSmaTsdb[i], now, maxSpeed); + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); if (code) goto _end; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0bfb90a374..5403395623 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -756,32 +756,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - - if (pTsdb->imem->nRow > 0) { - int32_t minCommitFid = tsdbKeyFid(pTsdb->imem->minKey, pCommitter->minutes, pCommitter->precision); - int32_t nLoops = 0; - - _wait_retention_end: - while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { - atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 0, 1); - if (++nLoops > 1000) { - nLoops = 0; - sched_yield(); - } - } - if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { - if (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { - atomic_store_8(&pTsdb->trimHdl.state, 0); - goto _wait_retention_end; - } - atomic_store_32(&pTsdb->trimHdl.minCommitFid, minCommitFid); - atomic_store_8(&pTsdb->trimHdl.state, 0); - } else { - goto _wait_retention_end; - } - atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 1, 0); - } - code = tsdbFSCopy(pTsdb, &pCommitter->fs); if (code) goto _err; @@ -988,38 +962,20 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - STsdbFS fsLatest = {0}; ASSERT(eno == 0); + code = tsdbFSCommit1(pTsdb, &pCommitter->fs); + if (code) goto _err; + // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - - ASSERT(pCommitter->fs.version <= pTsdb->fs.version); - - if (pCommitter->fs.version < pTsdb->fs.version) { - if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - - if ((code = tsdbFSUpdDel(pTsdb, &pCommitter->fs, &fsLatest, pTsdb->trimHdl.minCommitFid - 1))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - } - - code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } // commit or rollback code = tsdbFSCommit2(pTsdb, &pCommitter->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; + goto _err; } pTsdb->imem = NULL; @@ -1027,23 +983,20 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); -_exit: tsdbUnrefMemTable(pMemTable); tsdbFSDestroy(&pCommitter->fs); - tsdbFSDestroy(&fsLatest); taosArrayDestroy(pCommitter->aTbDataP); - atomic_store_32(&pTsdb->trimHdl.minCommitFid, INT32_MAX); // if (pCommitter->toMerge) { // code = tsdbMerge(pTsdb); // if (code) goto _err; // } - if(code == 0) { - tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); - } else { - tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - } - + + tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); + return code; + +_err: + tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index ef4f6930be..10926ae6ad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -250,7 +250,7 @@ _err: void tsdbFSDestroy(STsdbFS *pFS) { if (pFS->pDelFile) { - taosMemoryFreeClear(pFS->pDelFile); + taosMemoryFree(pFS->pDelFile); } for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) { @@ -263,7 +263,7 @@ void tsdbFSDestroy(STsdbFS *pFS) { } } - pFS->aDFileSet = taosArrayDestroy(pFS->aDFileSet); + taosArrayDestroy(pFS->aDFileSet); } static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { @@ -419,7 +419,6 @@ int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t code = 0; // open handle - pTsdb->fs.version = 0; pTsdb->fs.pDelFile = NULL; pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); if (pTsdb->fs.aDFileSet == NULL) { @@ -535,7 +534,6 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; - pFS->version = pTsdb->fs.version; pFS->pDelFile = NULL; pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); if (pFS->aDFileSet == NULL) { @@ -666,9 +664,6 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } } - // update the diskId - pDFileSet->diskId = pSet->diskId; - goto _exit; } } @@ -717,108 +712,6 @@ _exit: return code; } -/** - * @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew. - * - * @param pTsdb - * @param pFS - * @param pFSNew - * @param maxFid - * @return int32_t - */ -int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid) { - int32_t code = 0; - int32_t nRef = 0; - char fname[TSDB_FILENAME_LEN]; - - int32_t iOld = 0; - int32_t iNew = 0; - while (true) { - int32_t nOld = taosArrayGetSize(pFS->aDFileSet); - int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet); - SDFileSet fSet; - int8_t sameDisk; - - if (iOld >= nOld && iNew >= nNew) break; - - SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pFS->aDFileSet, iOld) : NULL; - SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL; - - if (pSetNew && (pSetNew->fid > maxFid)) break; - - if (pSetOld && pSetNew) { - if (pSetOld->fid == pSetNew->fid) { - goto _merge_migrate; - } else if (pSetOld->fid > pSetNew->fid) { - goto _remove_old; - } else { - ++iOld; - ASSERT(0); - } - continue; - } else { - break; - } - - _merge_migrate: - sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); - - ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID); - ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size); - ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset); - - if (!sameDisk) { - // head - *pSetOld->pHeadF = *pSetNew->pHeadF; - pSetOld->pHeadF->nRef = 1; - - // data - ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size); - *pSetOld->pDataF = *pSetNew->pDataF; - pSetOld->pDataF->nRef = 1; - - // sma - ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size); - *pSetOld->pSmaF = *pSetNew->pSmaF; - pSetOld->pSmaF->nRef = 1; - - // stt - ASSERT(pSetOld->nSttF == pSetNew->nSttF); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { - ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); - - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } - - // set diskId - pSetOld->diskId = pSetNew->diskId; - } - - ++iOld; - ++iNew; - continue; - - _remove_old: - taosMemoryFree(pSetOld->pHeadF); - taosMemoryFree(pSetOld->pDataF); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { - taosMemoryFree(pSetOld->aSttF[iStt]); - } - taosMemoryFree(pSetOld->pSmaF); - taosArrayRemove(pFS->aDFileSet, iOld); - ++iNew; - continue; - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb fs upd/del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t code = 0; char tfname[TSDB_FILENAME_LEN]; @@ -852,8 +745,6 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t nRef; char fname[TSDB_FILENAME_LEN]; - ++pTsdb->fs.version; - // del if (pFSNew->pDelFile) { SDelFile *pDelFile = pTsdb->fs.pDelFile; @@ -1030,8 +921,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; pSetOld->aSttF[iStt]->nRef = 1; } else { - ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); + ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 8ce61d49b9..ec760e3c57 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -71,10 +71,6 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } - pTsdb->trimHdl.maxRetentFid = INT32_MIN; - pTsdb->trimHdl.minCommitFid = INT32_MAX; - pTsdb->trimHdl.commitInWait = 0; - tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 282640b52c..fc577e3962 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,66 +607,7 @@ _err: return code; } -/** - * @brief send file with limited speed(rough control) - * - * @param pTsdb - * @param pFileOut - * @param pFileIn - * @param size - * @param speed 0 no limit, unit: B/s - * @return int64_t - */ -static int64_t tsdbFSendFile(STsdb *pTsdb, TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int64_t speed) { - if (speed <= 0) { - return taosFSendFile(pOutFD, pInFD, 0, size); - } - - int64_t offset = 0; - int64_t tBytes = 0; - int64_t nBytes = 0; - int64_t startMs = 0; - int64_t cost = 0; - - while ((offset + nBytes) < size) { - if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { - tsdbDebug("vgId:%d sendFile without limit since conflicts, fSize:%" PRIi64 ", maxSpeed:%" PRIi64, - TD_VID(pTsdb->pVnode), size, speed); - goto _send_remain; - } - startMs = taosGetTimestampMs(); - if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed)) < 0) { - return nBytes; - } - cost = taosGetTimestampMs() - startMs; - tBytes += nBytes; - - int64_t nSleep = 0; - if (cost < 0) { - nSleep = 1000; - } else if (cost < 1000) { - nSleep = 1000 - cost; - } - if (nSleep > 0) { - taosMsleep(nSleep); - tsdbDebug("vgId:%d sendFile and msleep:%" PRIi64 ", fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, - TD_VID(pTsdb->pVnode), nSleep, size, tBytes, speed); - } - } - -_send_remain: - if (offset < size) { - if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) { - return nBytes; - } - tBytes += nBytes; - tsdbDebug("vgId:%d sendFile remain, fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, TD_VID(pTsdb->pVnode), - size, tBytes, speed); - } - return tBytes; -} - -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed) { +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { int32_t code = 0; int64_t n; int64_t size; @@ -679,7 +620,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i // head tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo); - pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -689,7 +630,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), maxSpeed); + n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -700,7 +641,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i // data tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom); tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo); - pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -710,7 +651,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), maxSpeed); + n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -721,7 +662,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i // sma tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom); tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo); - pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -731,7 +672,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), maxSpeed); + n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -743,7 +684,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i for (int8_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[iStt], fNameFrom); tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[iStt], fNameTo); - pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -753,7 +694,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), maxSpeed); + n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index f3890b7b53..2c68c57176 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,296 +15,98 @@ #include "tsdb.h" -enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; - -#define MIGRATE_MAX_SPEED (1048576 << 4) // 16 MB, vnode level -#define MIGRATE_MIN_COST (5) // second - -static bool tsdbShouldDoMigrate(STsdb *pTsdb); -static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); -static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type); - -static bool tsdbShouldDoMigrate(STsdb *pTsdb) { - if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { - return false; - } - - STsdbKeepCfg *keepCfg = &pTsdb->keepCfg; - if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) { - return false; - } - return true; -} - -static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { - int32_t retention = RETENTION_NO; - if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) { - return retention; - } - - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0); - if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) { - retention |= RETENTION_EXPIRED; - } - - if (!tsdbShouldDoMigrate(pTsdb)) { - return retention; - } - - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); ++iSet) { - pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); +static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; if (expLevel == pSet->diskId.level) continue; - if (expLevel > 0) { - retention |= RETENTION_MIGRATE; - break; + if (expLevel < 0) { + return true; + } else { + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + return false; + } + + if (did.level == pSet->diskId.level) continue; + + return true; } } - return retention; + return false; } -/** - * @brief process retention - * - * @param pTsdb - * @param now - * @param maxSpeed - * @param retention - * @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE - * @return int32_t - */ -static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type) { +int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { int32_t code = 0; - int32_t nBatch = 0; - int32_t nLoops = 0; - int32_t maxFid = 0; - int64_t fSize = 0; - int64_t speed = maxSpeed > 0 ? maxSpeed : MIGRATE_MAX_SPEED; - STsdbFS fs = {0}; - STsdbFS fsLatest = {0}; - if (!(retention & type)) { - goto _exit; + if (!tsdbShouldDoRetention(pTsdb, now)) { + return code; } -_retention_loop: - // reset - maxFid = INT32_MIN; - fSize = 0; - tsdbFSDestroy(&fs); - tsdbFSDestroy(&fsLatest); - - if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { - atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN); - taosMsleep(50); - } + // do retention + STsdbFS fs; code = tsdbFSCopy(pTsdb, &fs); - if (code) goto _exit; + if (code) goto _err; - if (type == RETENTION_MIGRATE) { - int32_t fsSize = taosArrayGetSize(fs.aDFileSet); - for (int32_t iSet = 0; iSet < fsSize; ++iSet) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; - if (pSet->diskId.level == expLevel) continue; - - if (expLevel > 0) { - ASSERT(pSet->fid > maxFid); - maxFid = pSet->fid; - fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); - if (fSize / speed > MIGRATE_MIN_COST) { - tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); - break; - } - for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { - fSize += pSet->aSttF[iStt]->size; - } - if (fSize / speed > MIGRATE_MIN_COST) { - tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); - break; - } + if (expLevel < 0) { + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->aSttF[0]); + taosMemoryFree(pSet->pSmaF); + taosArrayRemove(fs.aDFileSet, iSet); + iSet--; + } else { + if (expLevel == 0) continue; + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + goto _exit; } - } - } else if (type == RETENTION_EXPIRED) { - for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - if (expLevel < 0) { - ASSERT(pSet->fid > maxFid); - if (pSet->fid > maxFid) maxFid = pSet->fid; - taosMemoryFree(pSet->pHeadF); - taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->pSmaF); - for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { - taosMemoryFree(pSet->aSttF[iStt]); - } - taosArrayRemove(fs.aDFileSet, iSet); - --iSet; - } else { - break; - } + if (did.level == pSet->diskId.level) continue; + + // copy file to new disk (todo) + SDFileSet fSet = *pSet; + fSet.diskId = did; + + code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); + if (code) goto _err; + + code = tsdbFSUpsertFSet(&fs, &fSet); + if (code) goto _err; } } - if (maxFid == INT32_MIN) goto _exit; + // do change fs + code = tsdbFSCommit1(pTsdb, &fs); + if (code) goto _err; -_commit_conflict_check: - while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { - if (++nLoops > 1000) { - nLoops = 0; - sched_yield(); - } - } - if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { - if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { - atomic_store_8(&pTsdb->trimHdl.state, 0); - goto _commit_conflict_check; - } - atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); - atomic_store_8(&pTsdb->trimHdl.state, 0); - } else { - goto _commit_conflict_check; - } - - // migrate - if (type == RETENTION_MIGRATE) { - for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - - if (pSet->fid > maxFid) break; - - tsdbDebug("vgId:%d migrate loop %d with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), - nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); - - if (expLevel < 0) { - taosMemoryFree(pSet->pHeadF); - taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->pSmaF); - for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { - taosMemoryFree(pSet->aSttF[iStt]); - } - taosArrayRemove(fs.aDFileSet, iSet); - --iSet; - } else { - if (expLevel == pSet->diskId.level) continue; - - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - code = terrno; - goto _exit; - } - - if (did.level == pSet->diskId.level) continue; - - // copy file to new disk - SDFileSet fSet = *pSet; - fSet.diskId = did; - - code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed); - if (code) goto _exit; - - code = tsdbFSUpsertFSet(&fs, &fSet); - if (code) goto _exit; - } - } - } - -_merge_fs: taosThreadRwlockWrlock(&pTsdb->rwLock); - // 1) prepare fs, merge tsdbFSNew and pTsdb->fs if needed - STsdbFS *pTsdbFS = &fs; - ASSERT(fs.version <= pTsdb->fs.version); - - if (fs.version < pTsdb->fs.version) { - if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - - if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - pTsdbFS = &fsLatest; - } - - // 2) save CURRENT - if ((code = tsdbFSCommit1(pTsdb, pTsdbFS))) { + code = tsdbFSCommit2(pTsdb, &fs); + if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; + goto _err; } - // 3) apply the tsdbFS to pTsdb->fs - if ((code = tsdbFSCommit2(pTsdb, pTsdbFS))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } taosThreadRwlockUnlock(&pTsdb->rwLock); - if (type == RETENTION_MIGRATE) { - ++nBatch; - goto _retention_loop; - } - -_exit: tsdbFSDestroy(&fs); - tsdbFSDestroy(&fsLatest); - if (code != 0) { - tsdbError("vgId:%d, tsdb do retention %" PRIi8 " failed since %s", TD_VID(pTsdb->pVnode), type, tstrerror(code)); - ASSERT(0); - } - return code; -} - -/** - * @brief Data migration between multi-tier storage, including remove expired data. - * 1) firstly, remove expired DFileSet; - * 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate - * DFileSet groups between multi-tier storage; - * 3) update the tsdbFS and CURRENT in the same transaction; - * 4) finish the migration - * @param pTsdb - * @param now - * @param maxSpeed - * @return int32_t - */ -int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) { - int32_t code = 0; - int32_t retention = RETENTION_NO; - - retention = tsdbShouldDoRetention(pTsdb, now); - if (retention == RETENTION_NO) { - goto _exit; - } - - // step 1: process expire - code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_EXPIRED); - if (code < 0) goto _exit; - - // step 2: process multi-tier migration - code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_MIGRATE); - if (code < 0) goto _exit; _exit: - pTsdb->trimHdl.maxRetentFid = INT32_MIN; - if (code != 0) { - tsdbError("vgId:%d, tsdb do retention %d failed since %s, time:%" PRIi64 ", max speed:%" PRIi64, - TD_VID(pTsdb->pVnode), retention, tstrerror(code), now, maxSpeed); - ASSERT(0); - // tsdbFSRollback(pTsdb->pFS); - } else { - tsdbInfo("vgId:%d, tsdb do retention %d succeed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pTsdb->pVnode), - retention, now, maxSpeed); - } + return code; + +_err: + tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + ASSERT(0); + // tsdbFSRollback(pTsdb->pFS); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 5e76e2b4d2..4999e7a49a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -509,24 +509,16 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK *maxKey = *minKey + minutes * tsTickPerMin[precision] - 1; } -/** - * @brief get fid level by keep and days. - * - * @param fid - * @param pKeepCfg - * @param now millisecond - * @return int32_t - */ int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) { int32_t aFid[3]; TSKEY key; if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) { - // now = now * 1000; + now = now * 1000; } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) { - now = now * 1000l; - } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) { now = now * 1000000l; + } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) { + now = now * 1000000000l; } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 481c8b66be..4ccfea4051 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -186,17 +186,6 @@ void vnodePreClose(SVnode *pVnode) { } } -static void vnodeTrimDbClose(SVnode *pVnode) { - int32_t nLoops = 0; - while (atomic_load_8(&pVnode->trimDbH.state) != 0) { - if (++nLoops > 1000) { - vTrace("vgId:%d, wait for trimDb task to finish", TD_VID(pVnode)); - sched_yield(); - nLoops = 0; - } - } -} - void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeCommit(pVnode); @@ -208,7 +197,6 @@ void vnodeClose(SVnode *pVnode) { smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); - vnodeTrimDbClose(pVnode); // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 464fd50ab0..6e9eba306a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -378,7 +378,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } -#if 0 static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -389,7 +388,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, goto _exit; } - vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, pVnode->config.vgId, trimReq.timestamp); + vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); // process code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); @@ -401,91 +400,6 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, _exit: return code; } -#endif - -typedef struct { - SVnode *pVnode; - SVTrimDbReq trimReq; -} SVndTrimDbReq; - -void *vnodeProcessTrimReqFunc(void *param) { - int32_t code = 0; - int8_t oldVal = 0; - SVndTrimDbReq *pReq = (SVndTrimDbReq *)param; - SVnode *pVnode = pReq->pVnode; - - setThreadName("vnode-trim"); - - // process - code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); - if (code) goto _exit; - - code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); - if (code) goto _exit; -_exit: - oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0); - ASSERT(oldVal == 1); - if (code) { - vError("vgId:%d, trim vnode thread failed since %s, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), - tstrerror(code), pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); - } else { - vInfo("vgId:%d, trim vnode thread finish, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), - pReq->trimReq.timestamp, pReq->trimReq.maxSpeed); - } - taosMemoryFree(pReq); - return NULL; -} - -static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - int32_t code = 0; - SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq)); - SVTrimDbHdl *pHandle = &pVnode->trimDbH; - - if (!pVndTrimReq) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - pVndTrimReq->pVnode = pVnode; - - if (tDeserializeSVTrimDbReq(pReq, len, &pVndTrimReq->trimReq) != 0) { - taosMemoryFree(pVndTrimReq); - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - - if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) { - vInfo("vgId:%d, trim vnode request ignored since duplicated req, time:%" PRIi64 ", max speed:%" PRIi64, - TD_VID(pVnode), pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); - taosMemoryFree(pVndTrimReq); - goto _exit; - } - - vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), - pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); - - TdThreadAttr thAttr = {0}; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); - - TdThread tid; - if (taosThreadCreate(&tid, &thAttr, vnodeProcessTrimReqFunc, (void *)pVndTrimReq) != 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pVndTrimReq); - taosThreadAttrDestroy(&thAttr); - int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0); - ASSERT(oldVal == 1); - vError("vgId:%d, failed to create pthread to trim vnode since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } - vDebug("vgId:%d, success to create pthread to trim vnode", TD_VID(pVnode)); - - taosThreadAttrDestroy(&thAttr); - -_exit: - terrno = code; - return code; -} static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); @@ -1007,8 +921,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), - (int32_t)taosArrayGetSize(newTbUids)); + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); } tqUpdateTbUidList(pVnode->pTq, newTbUids, true);