chore: data migration support specify max speed
This commit is contained in:
parent
6c682daf42
commit
43c954b6e7
|
@ -875,7 +875,7 @@ int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t timestamp; // unit: millisecond
|
int64_t timestamp; // unit: millisecond
|
||||||
int32_t maxSpeed; // 0 no limit, unit: bit/s
|
int64_t maxSpeed; // 0 no limit, unit: Byte/s
|
||||||
} SVTrimDbReq;
|
} SVTrimDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
|
int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
|
||||||
|
|
|
@ -2709,7 +2709,7 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->maxSpeed) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->maxSpeed) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -2723,7 +2723,7 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->maxSpeed) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->maxSpeed) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -1385,11 +1385,19 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
/**
|
||||||
|
* @brief trim database
|
||||||
|
*
|
||||||
|
* @param pMnode
|
||||||
|
* @param pDb
|
||||||
|
* @param maxSpeed MB/s
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 2}; // TODO: use specified maxSpeed
|
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = maxSpeed << 20};
|
||||||
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
|
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
|
||||||
int32_t contLen = reqLen + sizeof(SMsgHead);
|
int32_t contLen = reqLen + sizeof(SMsgHead);
|
||||||
|
|
||||||
|
@ -1413,7 +1421,8 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
|
mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64, pVgroup->vgId, trimReq.timestamp);
|
mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64 ", max speed:%" PRIi64, pVgroup->vgId,
|
||||||
|
trimReq.timestamp, trimReq.maxSpeed);
|
||||||
}
|
}
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
@ -1443,7 +1452,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndTrimDb(pMnode, pDb);
|
code = mndTrimDb(pMnode, pDb, trimReq.maxSpeed);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -264,7 +264,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
||||||
int8_t cmprAlg, int8_t toLast);
|
int8_t cmprAlg, int8_t toLast);
|
||||||
|
|
||||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed);
|
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed);
|
||||||
// SDataFReader
|
// SDataFReader
|
||||||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||||
|
@ -320,8 +320,8 @@ struct STsdbFS {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STsdbTrimHdl {
|
struct STsdbTrimHdl {
|
||||||
volatile int8_t state; // 0 idle 1 in use
|
volatile int8_t state; // 0 idle 1 in use
|
||||||
volatile int8_t limitSpeed; // 0 no limit, 1 with limit
|
volatile int8_t commitInWait; // 0 not in wait, 1 in wait
|
||||||
volatile int32_t maxRetentFid;
|
volatile int32_t maxRetentFid;
|
||||||
volatile int32_t minCommitFid;
|
volatile int32_t minCommitFid;
|
||||||
};
|
};
|
||||||
|
|
|
@ -145,7 +145,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
int32_t tsdbBegin(STsdb* pTsdb);
|
int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
int32_t tsdbCommit(STsdb* pTsdb);
|
int32_t tsdbCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int32_t maxSpeed);
|
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int64_t maxSpeed);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
|
@ -200,7 +200,7 @@ int32_t smaSyncPostCommit(SSma* pSma);
|
||||||
int32_t smaAsyncPreCommit(SSma* pSma);
|
int32_t smaAsyncPreCommit(SSma* pSma);
|
||||||
int32_t smaAsyncCommit(SSma* pSma);
|
int32_t smaAsyncCommit(SSma* pSma);
|
||||||
int32_t smaAsyncPostCommit(SSma* pSma);
|
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||||
int32_t smaDoRetention(SSma* pSma, int64_t now, int32_t maxSpeed);
|
int32_t smaDoRetention(SSma* pSma, int64_t now, int64_t maxSpeed);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
|
@ -664,7 +664,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
* @param maxSpeed
|
* @param maxSpeed
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaDoRetention(SSma *pSma, int64_t now, int32_t maxSpeed) {
|
int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (!VND_IS_RSMA(pSma->pVnode)) {
|
if (!VND_IS_RSMA(pSma->pVnode)) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -763,10 +763,11 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
|
|
||||||
_wait_retention_end:
|
_wait_retention_end:
|
||||||
while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) {
|
while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) {
|
||||||
atomic_val_compare_exchange_8(&pTsdb->trimHdl.limitSpeed, 1, 0);
|
atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 0, 1);
|
||||||
if (++nLoops > 1000) {
|
if (++nLoops > 1000) {
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
printf("%s:%d wait retention to finish\n", __func__, __LINE__);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
||||||
|
@ -779,7 +780,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
} else {
|
} else {
|
||||||
goto _wait_retention_end;
|
goto _wait_retention_end;
|
||||||
}
|
}
|
||||||
atomic_store_8(&pTsdb->trimHdl.limitSpeed, 1);
|
atomic_store_8(&pTsdb->trimHdl.commitInWait, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
|
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
|
||||||
|
|
|
@ -73,7 +73,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
|
|
||||||
pTsdb->trimHdl.maxRetentFid = INT32_MIN;
|
pTsdb->trimHdl.maxRetentFid = INT32_MIN;
|
||||||
pTsdb->trimHdl.minCommitFid = INT32_MAX;
|
pTsdb->trimHdl.minCommitFid = INT32_MAX;
|
||||||
pTsdb->trimHdl.limitSpeed = 1;
|
pTsdb->trimHdl.commitInWait = 0;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
||||||
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
||||||
|
|
|
@ -610,13 +610,14 @@ _err:
|
||||||
/**
|
/**
|
||||||
* @brief send file with limited speed(rough control)
|
* @brief send file with limited speed(rough control)
|
||||||
*
|
*
|
||||||
|
* @param pTsdb
|
||||||
* @param pFileOut
|
* @param pFileOut
|
||||||
* @param pFileIn
|
* @param pFileIn
|
||||||
* @param size
|
* @param size
|
||||||
* @param speed 0 no limit, unit: B/s
|
* @param speed 0 no limit, unit: B/s
|
||||||
* @return int64_t
|
* @return int64_t
|
||||||
*/
|
*/
|
||||||
static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int32_t speed) {
|
static int64_t tsdbFSendFile(STsdb *pTsdb, TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int64_t speed) {
|
||||||
if (speed <= 0) {
|
if (speed <= 0) {
|
||||||
return taosFSendFile(pOutFD, pInFD, 0, size);
|
return taosFSendFile(pOutFD, pInFD, 0, size);
|
||||||
}
|
}
|
||||||
|
@ -626,9 +627,15 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in
|
||||||
int64_t nBytes = 0;
|
int64_t nBytes = 0;
|
||||||
int64_t startMs = 0;
|
int64_t startMs = 0;
|
||||||
int64_t cost = 0;
|
int64_t cost = 0;
|
||||||
while ((offset + speed) < size) {
|
|
||||||
|
while ((offset + nBytes) < size) {
|
||||||
|
if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) {
|
||||||
|
tsdbInfo("vgId:%d sendFile without limit since conflicts, fSize:%" PRIi64 ", maxSpeed:%" PRIi64,
|
||||||
|
TD_VID(pTsdb->pVnode), size, speed);
|
||||||
|
goto _send_remain;
|
||||||
|
}
|
||||||
startMs = taosGetTimestampMs();
|
startMs = taosGetTimestampMs();
|
||||||
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed)) < 0) {
|
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed / 10)) < 0) {
|
||||||
return nBytes;
|
return nBytes;
|
||||||
}
|
}
|
||||||
cost = taosGetTimestampMs() - startMs;
|
cost = taosGetTimestampMs() - startMs;
|
||||||
|
@ -642,20 +649,24 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in
|
||||||
}
|
}
|
||||||
if (nSleep > 0) {
|
if (nSleep > 0) {
|
||||||
taosMsleep(nSleep);
|
taosMsleep(nSleep);
|
||||||
tsdbDebug("sendFile from %p to %p, fSize:%" PRIi64 ", maxSpeed:%d, msleep:%" PRIi64, pOutFD, pInFD, size, speed,
|
tsdbInfo("vgId:%d sendFile and msleep:%" PRIi64 ", fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64,
|
||||||
nSleep);
|
TD_VID(pTsdb->pVnode), nSleep, size, tBytes, speed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_send_remain:
|
||||||
if (offset < size) {
|
if (offset < size) {
|
||||||
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) {
|
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) {
|
||||||
return nBytes;
|
return nBytes;
|
||||||
}
|
}
|
||||||
tBytes += nBytes;
|
tBytes += nBytes;
|
||||||
|
tsdbInfo("vgId:%d sendFile remain, fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, TD_VID(pTsdb->pVnode),
|
||||||
|
size, tBytes, speed);
|
||||||
}
|
}
|
||||||
return tBytes;
|
return tBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed) {
|
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t n;
|
int64_t n;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
@ -664,13 +675,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
||||||
int32_t szPage = pTsdb->pVnode->config.szPage;
|
int32_t szPage = pTsdb->pVnode->config.szPage;
|
||||||
char fNameFrom[TSDB_FILENAME_LEN];
|
char fNameFrom[TSDB_FILENAME_LEN];
|
||||||
char fNameTo[TSDB_FILENAME_LEN];
|
char fNameTo[TSDB_FILENAME_LEN];
|
||||||
int32_t speed = 0;
|
|
||||||
int64_t fStatSize = 0;
|
int64_t fStatSize = 0;
|
||||||
|
|
||||||
if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) {
|
|
||||||
speed = maxSpeed;
|
|
||||||
}
|
|
||||||
|
|
||||||
// head
|
// head
|
||||||
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
|
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
|
||||||
tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
|
tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
|
||||||
|
@ -687,7 +693,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
||||||
fStatSize = 0;
|
fStatSize = 0;
|
||||||
taosStatFile(fNameFrom, &fStatSize, 0);
|
taosStatFile(fNameFrom, &fStatSize, 0);
|
||||||
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
|
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
|
||||||
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed);
|
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), maxSpeed);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -711,7 +717,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
||||||
fStatSize = 0;
|
fStatSize = 0;
|
||||||
taosStatFile(fNameFrom, &fStatSize, 0);
|
taosStatFile(fNameFrom, &fStatSize, 0);
|
||||||
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage));
|
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage));
|
||||||
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), speed);
|
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), maxSpeed);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -735,7 +741,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
||||||
fStatSize = 0;
|
fStatSize = 0;
|
||||||
taosStatFile(fNameFrom, &fStatSize, 0);
|
taosStatFile(fNameFrom, &fStatSize, 0);
|
||||||
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
|
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
|
||||||
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed);
|
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), maxSpeed);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -757,10 +763,10 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
fStatSize = 0;
|
fStatSize = 0;
|
||||||
taosStatFile(fNameFrom, &fStatSize, 0);
|
taosStatFile(fNameFrom, &fStatSize, 0);
|
||||||
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
|
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
|
||||||
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed);
|
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), maxSpeed);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -30,7 +30,7 @@ enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 };
|
||||||
static bool tsdbShouldDoMigrate(STsdb *pTsdb);
|
static bool tsdbShouldDoMigrate(STsdb *pTsdb);
|
||||||
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
|
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
|
||||||
static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention);
|
static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention);
|
||||||
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention);
|
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention);
|
||||||
|
|
||||||
static bool tsdbShouldDoMigrate(STsdb *pTsdb) {
|
static bool tsdbShouldDoMigrate(STsdb *pTsdb) {
|
||||||
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
|
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
|
||||||
|
@ -109,6 +109,7 @@ _wait_commit_end:
|
||||||
if (++nLoops > 1000) {
|
if (++nLoops > 1000) {
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
sched_yield();
|
sched_yield();
|
||||||
|
printf("%s:%d wait commit finished\n", __func__, __LINE__);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
||||||
|
@ -163,7 +164,7 @@ _exit:
|
||||||
* @param retention
|
* @param retention
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention) {
|
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t nBatch = 0;
|
int32_t nBatch = 0;
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
|
@ -183,6 +184,11 @@ _migrate_loop:
|
||||||
tsdbFSDestroy(&fs);
|
tsdbFSDestroy(&fs);
|
||||||
tsdbFSDestroy(&fsLatest);
|
tsdbFSDestroy(&fsLatest);
|
||||||
|
|
||||||
|
if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) {
|
||||||
|
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN);
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbFSCopy(pTsdb, &fs);
|
code = tsdbFSCopy(pTsdb, &fs);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
@ -315,7 +321,7 @@ _exit:
|
||||||
* @param maxSpeed
|
* @param maxSpeed
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int32_t maxSpeed) {
|
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t retention = RETENTION_NO;
|
int32_t retention = RETENTION_NO;
|
||||||
|
|
||||||
|
|
|
@ -455,13 +455,14 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) {
|
if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) {
|
||||||
vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%" PRIi64, TD_VID(pVnode),
|
vInfo("vgId:%d, trim vnode request ignored since duplicated req, time:%" PRIi64 ", max speed:%" PRIi64,
|
||||||
pVndTrimReq->trimReq.timestamp);
|
TD_VID(pVnode), pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed);
|
||||||
taosMemoryFree(pVndTrimReq);
|
taosMemoryFree(pVndTrimReq);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, TD_VID(pVnode), pVndTrimReq->trimReq.timestamp);
|
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode),
|
||||||
|
pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed);
|
||||||
|
|
||||||
TdThreadAttr thAttr = {0};
|
TdThreadAttr thAttr = {0};
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
|
@ -474,10 +475,10 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0);
|
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0);
|
||||||
ASSERT(oldVal == 1);
|
ASSERT(oldVal == 1);
|
||||||
vError("vgId:%d, failed to create pthread for trim vnode since %s", TD_VID(pVnode), tstrerror(code));
|
vError("vgId:%d, failed to create pthread to trim vnode since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
vDebug("vgId:%d, success to create pthread for trim vnode", TD_VID(pVnode));
|
vDebug("vgId:%d, success to create pthread to trim vnode", TD_VID(pVnode));
|
||||||
|
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue