diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 262bd083ef..6611baa2f4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -265,7 +265,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0a2d37bdbc..bc9c37fc76 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -763,6 +763,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { _wait_retention_end: while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { + atomic_val_compare_exchange_8(&pTsdb->trimHdl.limitSpeed, 1, 0); if (++nLoops > 1000) { nLoops = 0; sched_yield(); @@ -778,6 +779,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { } else { goto _wait_retention_end; } + atomic_store_8(&pTsdb->trimHdl.limitSpeed, 1); } code = tsdbFSCopy(pTsdb, &pCommitter->fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 5fe0b408b1..615b383e65 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,7 +607,43 @@ _err: return code; } -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { +/** + * @brief send file with limited speed(rough control) + * + * @param pFileOut + * @param pFileIn + * @param size + * @param speed 0 no limit, unit: B/s + * @return int64_t + */ +static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int32_t speed) { + if (speed <= 0) { + return taosFSendFile(pOutFD, pInFD, 0, size); + } + + int64_t offset = 0; + int64_t nBytes = 0; + int64_t startMs = 0; + int64_t endMs = 0; + int64_t cost = 0; + while ((offset + speed) < size) { + startMs = taosGetTimestampMs(); + nBytes += taosFSendFile(pOutFD, pInFD, &offset, speed); + cost = taosGetTimestampMs() - startMs; + + if (cost < 0) { + taosMsleep(1000); + } else if (cost < 1000) { + taosMsleep(1000 - cost); + } + } + if (offset < size) { + nBytes += taosFSendFile(pOutFD, pInFD, &offset, size - offset); + } + return nBytes; +} + +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed) { int32_t code = 0; int64_t n; int64_t size; @@ -616,6 +652,12 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { int32_t szPage = pTsdb->pVnode->config.szPage; char fNameFrom[TSDB_FILENAME_LEN]; char fNameTo[TSDB_FILENAME_LEN]; + int32_t speed = 0; + + if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) { + ASSERT(maxSpeed > 0); + speed = maxSpeed; + } // head tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); @@ -630,7 +672,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage)); + n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -651,7 +693,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage)); + n = tsdbFSendFile(pOutFD, PInFD, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -672,7 +714,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage)); + n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -694,7 +736,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); + n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c index a5a16a9602..34b8e0018f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -245,7 +245,7 @@ _wait_commit_end: SDFileSet fSet = *pSet; fSet.diskId = did; - code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); + code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed); if (code) goto _exit; code = tsdbFSUpsertFSet(&fs, &fSet);