Revert "enh(tsdb): data migrate should not block data r/w and commit"
This commit is contained in:
parent
fa8593e9ca
commit
1fe66cb846
|
@ -873,8 +873,7 @@ int32_t tSerializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
|
|||
int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t timestamp; // unit: millisecond
|
||||
int64_t maxSpeed; // 0 no limit, unit: Byte/s
|
||||
int32_t timestamp;
|
||||
} SVTrimDbReq;
|
||||
|
||||
int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
|
||||
|
|
|
@ -2708,8 +2708,7 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
|
|||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->maxSpeed) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2722,8 +2721,7 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
|
|||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->maxSpeed) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -1385,19 +1385,11 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief trim database
|
||||
*
|
||||
* @param pMnode
|
||||
* @param pDb
|
||||
* @param maxSpeed MB/s
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) {
|
||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = maxSpeed << 20};
|
||||
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampSec()};
|
||||
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
|
||||
int32_t contLen = reqLen + sizeof(SMsgHead);
|
||||
|
||||
|
@ -1421,8 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) {
|
|||
if (code != 0) {
|
||||
mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
|
||||
} else {
|
||||
mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64 ", max speed:%" PRIi64, pVgroup->vgId,
|
||||
trimReq.timestamp, trimReq.maxSpeed);
|
||||
mInfo("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
@ -1452,7 +1443,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndTrimDb(pMnode, pDb, trimReq.maxSpeed);
|
||||
code = mndTrimDb(pMnode, pDb);
|
||||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
|
|
|
@ -64,7 +64,6 @@ typedef struct SDelFWriter SDelFWriter;
|
|||
typedef struct SDelFReader SDelFReader;
|
||||
typedef struct SRowIter SRowIter;
|
||||
typedef struct STsdbFS STsdbFS;
|
||||
typedef struct STsdbTrimHdl STsdbTrimHdl;
|
||||
typedef struct SRowMerger SRowMerger;
|
||||
typedef struct STsdbReadSnap STsdbReadSnap;
|
||||
typedef struct SBlockInfo SBlockInfo;
|
||||
|
@ -249,7 +248,6 @@ int32_t tsdbFSClose(STsdb *pTsdb);
|
|||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSDestroy(STsdbFS *pFS);
|
||||
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid);
|
||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
||||
|
@ -270,7 +268,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
|||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
||||
int8_t cmprAlg, int8_t toLast);
|
||||
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed);
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
||||
// SDataFReader
|
||||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||
|
@ -322,18 +320,10 @@ int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSc
|
|||
|
||||
// structs =======================
|
||||
struct STsdbFS {
|
||||
int64_t version;
|
||||
SDelFile *pDelFile;
|
||||
SArray *aDFileSet; // SArray<SDFileSet>
|
||||
};
|
||||
|
||||
struct STsdbTrimHdl {
|
||||
volatile int8_t state; // 0 idle 1 in use
|
||||
volatile int8_t commitInWait; // 0 not in wait, 1 in wait
|
||||
volatile int32_t maxRetentFid;
|
||||
volatile int32_t minCommitFid;
|
||||
};
|
||||
|
||||
struct STsdb {
|
||||
char *path;
|
||||
SVnode *pVnode;
|
||||
|
@ -342,7 +332,6 @@ struct STsdb {
|
|||
SMemTable *mem;
|
||||
SMemTable *imem;
|
||||
STsdbFS fs;
|
||||
STsdbTrimHdl trimHdl;
|
||||
SLRUCache *lruCache;
|
||||
TdThreadMutex lruMutex;
|
||||
};
|
||||
|
|
|
@ -58,7 +58,6 @@ typedef struct STQ STQ;
|
|||
typedef struct SVState SVState;
|
||||
typedef struct SVBufPool SVBufPool;
|
||||
typedef struct SQWorker SQHandle;
|
||||
typedef struct SVTrimDbHdl SVTrimDbHdl;
|
||||
typedef struct STsdbKeepCfg STsdbKeepCfg;
|
||||
typedef struct SMetaSnapReader SMetaSnapReader;
|
||||
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
||||
|
@ -145,7 +144,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
|
|||
int tsdbClose(STsdb** pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int64_t maxSpeed);
|
||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||
|
@ -200,7 +199,7 @@ int32_t smaSyncPostCommit(SSma* pSma);
|
|||
int32_t smaAsyncPreCommit(SSma* pSma);
|
||||
int32_t smaAsyncCommit(SSma* pSma);
|
||||
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||
int32_t smaDoRetention(SSma* pSma, int64_t now, int64_t maxSpeed);
|
||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||
|
||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
@ -302,9 +301,6 @@ struct STsdbKeepCfg {
|
|||
int32_t keep1;
|
||||
int32_t keep2;
|
||||
};
|
||||
struct SVTrimDbHdl {
|
||||
volatile int8_t state; // 0 not in trim, 1 in trim
|
||||
};
|
||||
|
||||
struct SVnode {
|
||||
char* path;
|
||||
|
@ -329,7 +325,6 @@ struct SVnode {
|
|||
bool restored;
|
||||
tsem_t syncSem;
|
||||
SQHandle* pQuery;
|
||||
SVTrimDbHdl trimDbH;
|
||||
};
|
||||
|
||||
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
||||
|
|
|
@ -661,10 +661,9 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
|||
*
|
||||
* @param pSma
|
||||
* @param now
|
||||
* @param maxSpeed
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) {
|
||||
int32_t smaDoRetention(SSma *pSma, int64_t now) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!VND_IS_RSMA(pSma->pVnode)) {
|
||||
return code;
|
||||
|
@ -672,7 +671,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) {
|
|||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now, maxSpeed);
|
||||
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
|
||||
if (code) goto _end;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -756,32 +756,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pTsdb->imem->nRow > 0) {
|
||||
int32_t minCommitFid = tsdbKeyFid(pTsdb->imem->minKey, pCommitter->minutes, pCommitter->precision);
|
||||
int32_t nLoops = 0;
|
||||
|
||||
_wait_retention_end:
|
||||
while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) {
|
||||
atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 0, 1);
|
||||
if (++nLoops > 1000) {
|
||||
nLoops = 0;
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
||||
if (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) {
|
||||
atomic_store_8(&pTsdb->trimHdl.state, 0);
|
||||
goto _wait_retention_end;
|
||||
}
|
||||
atomic_store_32(&pTsdb->trimHdl.minCommitFid, minCommitFid);
|
||||
atomic_store_8(&pTsdb->trimHdl.state, 0);
|
||||
} else {
|
||||
goto _wait_retention_end;
|
||||
}
|
||||
atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 1, 0);
|
||||
}
|
||||
|
||||
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -988,38 +962,20 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
|||
int32_t code = 0;
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
SMemTable *pMemTable = pTsdb->imem;
|
||||
STsdbFS fsLatest = {0};
|
||||
|
||||
ASSERT(eno == 0);
|
||||
|
||||
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
ASSERT(pCommitter->fs.version <= pTsdb->fs.version);
|
||||
|
||||
if (pCommitter->fs.version < pTsdb->fs.version) {
|
||||
if ((code = tsdbFSCopy(pTsdb, &fsLatest))) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((code = tsdbFSUpdDel(pTsdb, &pCommitter->fs, &fsLatest, pTsdb->trimHdl.minCommitFid - 1))) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// commit or rollback
|
||||
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pTsdb->imem = NULL;
|
||||
|
@ -1027,23 +983,20 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
|||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
|
||||
_exit:
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
tsdbFSDestroy(&pCommitter->fs);
|
||||
tsdbFSDestroy(&fsLatest);
|
||||
taosArrayDestroy(pCommitter->aTbDataP);
|
||||
atomic_store_32(&pTsdb->trimHdl.minCommitFid, INT32_MAX);
|
||||
|
||||
// if (pCommitter->toMerge) {
|
||||
// code = tsdbMerge(pTsdb);
|
||||
// if (code) goto _err;
|
||||
// }
|
||||
if(code == 0) {
|
||||
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
} else {
|
||||
tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
}
|
||||
|
||||
|
||||
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -250,7 +250,7 @@ _err:
|
|||
|
||||
void tsdbFSDestroy(STsdbFS *pFS) {
|
||||
if (pFS->pDelFile) {
|
||||
taosMemoryFreeClear(pFS->pDelFile);
|
||||
taosMemoryFree(pFS->pDelFile);
|
||||
}
|
||||
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
|
||||
|
@ -263,7 +263,7 @@ void tsdbFSDestroy(STsdbFS *pFS) {
|
|||
}
|
||||
}
|
||||
|
||||
pFS->aDFileSet = taosArrayDestroy(pFS->aDFileSet);
|
||||
taosArrayDestroy(pFS->aDFileSet);
|
||||
}
|
||||
|
||||
static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||
|
@ -419,7 +419,6 @@ int32_t tsdbFSOpen(STsdb *pTsdb) {
|
|||
int32_t code = 0;
|
||||
|
||||
// open handle
|
||||
pTsdb->fs.version = 0;
|
||||
pTsdb->fs.pDelFile = NULL;
|
||||
pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
|
||||
if (pTsdb->fs.aDFileSet == NULL) {
|
||||
|
@ -535,7 +534,6 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
|
|||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
|
||||
pFS->version = pTsdb->fs.version;
|
||||
pFS->pDelFile = NULL;
|
||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
||||
if (pFS->aDFileSet == NULL) {
|
||||
|
@ -666,9 +664,6 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
|||
}
|
||||
}
|
||||
|
||||
// update the diskId
|
||||
pDFileSet->diskId = pSet->diskId;
|
||||
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
@ -717,108 +712,6 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Update or delete DFileSet in pFS according to DFileSet (fid <= maxFid) in pFSNew.
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param pFS
|
||||
* @param pFSNew
|
||||
* @param maxFid
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid) {
|
||||
int32_t code = 0;
|
||||
int32_t nRef = 0;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
int32_t iOld = 0;
|
||||
int32_t iNew = 0;
|
||||
while (true) {
|
||||
int32_t nOld = taosArrayGetSize(pFS->aDFileSet);
|
||||
int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet);
|
||||
SDFileSet fSet;
|
||||
int8_t sameDisk;
|
||||
|
||||
if (iOld >= nOld && iNew >= nNew) break;
|
||||
|
||||
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pFS->aDFileSet, iOld) : NULL;
|
||||
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL;
|
||||
|
||||
if (pSetNew && (pSetNew->fid > maxFid)) break;
|
||||
|
||||
if (pSetOld && pSetNew) {
|
||||
if (pSetOld->fid == pSetNew->fid) {
|
||||
goto _merge_migrate;
|
||||
} else if (pSetOld->fid > pSetNew->fid) {
|
||||
goto _remove_old;
|
||||
} else {
|
||||
++iOld;
|
||||
ASSERT(0);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
_merge_migrate:
|
||||
sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
|
||||
|
||||
ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID);
|
||||
ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size);
|
||||
ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset);
|
||||
|
||||
if (!sameDisk) {
|
||||
// head
|
||||
*pSetOld->pHeadF = *pSetNew->pHeadF;
|
||||
pSetOld->pHeadF->nRef = 1;
|
||||
|
||||
// data
|
||||
ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size);
|
||||
*pSetOld->pDataF = *pSetNew->pDataF;
|
||||
pSetOld->pDataF->nRef = 1;
|
||||
|
||||
// sma
|
||||
ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size);
|
||||
*pSetOld->pSmaF = *pSetNew->pSmaF;
|
||||
pSetOld->pSmaF->nRef = 1;
|
||||
|
||||
// stt
|
||||
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) {
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset);
|
||||
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
}
|
||||
|
||||
// set diskId
|
||||
pSetOld->diskId = pSetNew->diskId;
|
||||
}
|
||||
|
||||
++iOld;
|
||||
++iNew;
|
||||
continue;
|
||||
|
||||
_remove_old:
|
||||
taosMemoryFree(pSetOld->pHeadF);
|
||||
taosMemoryFree(pSetOld->pDataF);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) {
|
||||
taosMemoryFree(pSetOld->aSttF[iStt]);
|
||||
}
|
||||
taosMemoryFree(pSetOld->pSmaF);
|
||||
taosArrayRemove(pFS->aDFileSet, iOld);
|
||||
++iNew;
|
||||
continue;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb fs upd/del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||
int32_t code = 0;
|
||||
char tfname[TSDB_FILENAME_LEN];
|
||||
|
@ -852,8 +745,6 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
int32_t nRef;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
++pTsdb->fs.version;
|
||||
|
||||
// del
|
||||
if (pFSNew->pDelFile) {
|
||||
SDelFile *pDelFile = pTsdb->fs.pDelFile;
|
||||
|
@ -1030,8 +921,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
} else {
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset);
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,10 +71,6 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pTsdb->trimHdl.maxRetentFid = INT32_MIN;
|
||||
pTsdb->trimHdl.minCommitFid = INT32_MAX;
|
||||
pTsdb->trimHdl.commitInWait = 0;
|
||||
|
||||
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
||||
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
||||
|
||||
|
|
|
@ -607,66 +607,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief send file with limited speed(rough control)
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param pFileOut
|
||||
* @param pFileIn
|
||||
* @param size
|
||||
* @param speed 0 no limit, unit: B/s
|
||||
* @return int64_t
|
||||
*/
|
||||
static int64_t tsdbFSendFile(STsdb *pTsdb, TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int64_t speed) {
|
||||
if (speed <= 0) {
|
||||
return taosFSendFile(pOutFD, pInFD, 0, size);
|
||||
}
|
||||
|
||||
int64_t offset = 0;
|
||||
int64_t tBytes = 0;
|
||||
int64_t nBytes = 0;
|
||||
int64_t startMs = 0;
|
||||
int64_t cost = 0;
|
||||
|
||||
while ((offset + nBytes) < size) {
|
||||
if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) {
|
||||
tsdbDebug("vgId:%d sendFile without limit since conflicts, fSize:%" PRIi64 ", maxSpeed:%" PRIi64,
|
||||
TD_VID(pTsdb->pVnode), size, speed);
|
||||
goto _send_remain;
|
||||
}
|
||||
startMs = taosGetTimestampMs();
|
||||
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed)) < 0) {
|
||||
return nBytes;
|
||||
}
|
||||
cost = taosGetTimestampMs() - startMs;
|
||||
tBytes += nBytes;
|
||||
|
||||
int64_t nSleep = 0;
|
||||
if (cost < 0) {
|
||||
nSleep = 1000;
|
||||
} else if (cost < 1000) {
|
||||
nSleep = 1000 - cost;
|
||||
}
|
||||
if (nSleep > 0) {
|
||||
taosMsleep(nSleep);
|
||||
tsdbDebug("vgId:%d sendFile and msleep:%" PRIi64 ", fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64,
|
||||
TD_VID(pTsdb->pVnode), nSleep, size, tBytes, speed);
|
||||
}
|
||||
}
|
||||
|
||||
_send_remain:
|
||||
if (offset < size) {
|
||||
if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) {
|
||||
return nBytes;
|
||||
}
|
||||
tBytes += nBytes;
|
||||
tsdbDebug("vgId:%d sendFile remain, fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, TD_VID(pTsdb->pVnode),
|
||||
size, tBytes, speed);
|
||||
}
|
||||
return tBytes;
|
||||
}
|
||||
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed) {
|
||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
int64_t size;
|
||||
|
@ -679,7 +620,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
// head
|
||||
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
|
||||
tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
|
||||
pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -689,7 +630,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), maxSpeed);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -700,7 +641,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
// data
|
||||
tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
|
||||
tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
|
||||
pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -710,7 +651,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), maxSpeed);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage));
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -721,7 +662,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
// sma
|
||||
tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
|
||||
tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);
|
||||
pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -731,7 +672,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), maxSpeed);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -743,7 +684,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
for (int8_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) {
|
||||
tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[iStt], fNameFrom);
|
||||
tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[iStt], fNameTo);
|
||||
pOutFD = taosCreateFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
@ -753,7 +694,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
|
|||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), maxSpeed);
|
||||
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
|
|
|
@ -15,296 +15,98 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 };
|
||||
|
||||
#define MIGRATE_MAX_SPEED (1048576 << 4) // 16 MB, vnode level
|
||||
#define MIGRATE_MIN_COST (5) // second
|
||||
|
||||
static bool tsdbShouldDoMigrate(STsdb *pTsdb);
|
||||
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
|
||||
static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type);
|
||||
|
||||
static bool tsdbShouldDoMigrate(STsdb *pTsdb) {
|
||||
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
|
||||
return false;
|
||||
}
|
||||
|
||||
STsdbKeepCfg *keepCfg = &pTsdb->keepCfg;
|
||||
if (keepCfg->keep0 == keepCfg->keep1 && keepCfg->keep1 == keepCfg->keep2) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
|
||||
int32_t retention = RETENTION_NO;
|
||||
if (taosArrayGetSize(pTsdb->fs.aDFileSet) == 0) {
|
||||
return retention;
|
||||
}
|
||||
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, 0);
|
||||
if (tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now) < 0) {
|
||||
retention |= RETENTION_EXPIRED;
|
||||
}
|
||||
|
||||
if (!tsdbShouldDoMigrate(pTsdb)) {
|
||||
return retention;
|
||||
}
|
||||
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); ++iSet) {
|
||||
pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
SDiskID did;
|
||||
|
||||
if (expLevel == pSet->diskId.level) continue;
|
||||
|
||||
if (expLevel > 0) {
|
||||
retention |= RETENTION_MIGRATE;
|
||||
break;
|
||||
if (expLevel < 0) {
|
||||
return true;
|
||||
} else {
|
||||
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (did.level == pSet->diskId.level) continue;
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return retention;
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief process retention
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param now
|
||||
* @param maxSpeed
|
||||
* @param retention
|
||||
* @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type) {
|
||||
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||
int32_t code = 0;
|
||||
int32_t nBatch = 0;
|
||||
int32_t nLoops = 0;
|
||||
int32_t maxFid = 0;
|
||||
int64_t fSize = 0;
|
||||
int64_t speed = maxSpeed > 0 ? maxSpeed : MIGRATE_MAX_SPEED;
|
||||
STsdbFS fs = {0};
|
||||
STsdbFS fsLatest = {0};
|
||||
|
||||
if (!(retention & type)) {
|
||||
goto _exit;
|
||||
if (!tsdbShouldDoRetention(pTsdb, now)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
_retention_loop:
|
||||
// reset
|
||||
maxFid = INT32_MIN;
|
||||
fSize = 0;
|
||||
tsdbFSDestroy(&fs);
|
||||
tsdbFSDestroy(&fsLatest);
|
||||
|
||||
if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) {
|
||||
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN);
|
||||
taosMsleep(50);
|
||||
}
|
||||
// do retention
|
||||
STsdbFS fs;
|
||||
|
||||
code = tsdbFSCopy(pTsdb, &fs);
|
||||
if (code) goto _exit;
|
||||
if (code) goto _err;
|
||||
|
||||
if (type == RETENTION_MIGRATE) {
|
||||
int32_t fsSize = taosArrayGetSize(fs.aDFileSet);
|
||||
for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
SDiskID did;
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
SDiskID did;
|
||||
|
||||
if (pSet->diskId.level == expLevel) continue;
|
||||
|
||||
if (expLevel > 0) {
|
||||
ASSERT(pSet->fid > maxFid);
|
||||
maxFid = pSet->fid;
|
||||
fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size);
|
||||
if (fSize / speed > MIGRATE_MIN_COST) {
|
||||
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
|
||||
break;
|
||||
}
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
|
||||
fSize += pSet->aSttF[iStt]->size;
|
||||
}
|
||||
if (fSize / speed > MIGRATE_MIN_COST) {
|
||||
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
|
||||
break;
|
||||
}
|
||||
if (expLevel < 0) {
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
taosMemoryFree(pSet->aSttF[0]);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
taosArrayRemove(fs.aDFileSet, iSet);
|
||||
iSet--;
|
||||
} else {
|
||||
if (expLevel == 0) continue;
|
||||
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
} else if (type == RETENTION_EXPIRED) {
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
SDiskID did;
|
||||
|
||||
if (expLevel < 0) {
|
||||
ASSERT(pSet->fid > maxFid);
|
||||
if (pSet->fid > maxFid) maxFid = pSet->fid;
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
|
||||
taosMemoryFree(pSet->aSttF[iStt]);
|
||||
}
|
||||
taosArrayRemove(fs.aDFileSet, iSet);
|
||||
--iSet;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
if (did.level == pSet->diskId.level) continue;
|
||||
|
||||
// copy file to new disk (todo)
|
||||
SDFileSet fSet = *pSet;
|
||||
fSet.diskId = did;
|
||||
|
||||
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSUpsertFSet(&fs, &fSet);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
if (maxFid == INT32_MIN) goto _exit;
|
||||
// do change fs
|
||||
code = tsdbFSCommit1(pTsdb, &fs);
|
||||
if (code) goto _err;
|
||||
|
||||
_commit_conflict_check:
|
||||
while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
|
||||
if (++nLoops > 1000) {
|
||||
nLoops = 0;
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
|
||||
if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
|
||||
atomic_store_8(&pTsdb->trimHdl.state, 0);
|
||||
goto _commit_conflict_check;
|
||||
}
|
||||
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid);
|
||||
atomic_store_8(&pTsdb->trimHdl.state, 0);
|
||||
} else {
|
||||
goto _commit_conflict_check;
|
||||
}
|
||||
|
||||
// migrate
|
||||
if (type == RETENTION_MIGRATE) {
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
|
||||
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
|
||||
SDiskID did;
|
||||
|
||||
if (pSet->fid > maxFid) break;
|
||||
|
||||
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode),
|
||||
nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel);
|
||||
|
||||
if (expLevel < 0) {
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
|
||||
taosMemoryFree(pSet->aSttF[iStt]);
|
||||
}
|
||||
taosArrayRemove(fs.aDFileSet, iSet);
|
||||
--iSet;
|
||||
} else {
|
||||
if (expLevel == pSet->diskId.level) continue;
|
||||
|
||||
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (did.level == pSet->diskId.level) continue;
|
||||
|
||||
// copy file to new disk
|
||||
SDFileSet fSet = *pSet;
|
||||
fSet.diskId = did;
|
||||
|
||||
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed);
|
||||
if (code) goto _exit;
|
||||
|
||||
code = tsdbFSUpsertFSet(&fs, &fSet);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_merge_fs:
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
// 1) prepare fs, merge tsdbFSNew and pTsdb->fs if needed
|
||||
STsdbFS *pTsdbFS = &fs;
|
||||
ASSERT(fs.version <= pTsdb->fs.version);
|
||||
|
||||
if (fs.version < pTsdb->fs.version) {
|
||||
if ((code = tsdbFSCopy(pTsdb, &fsLatest))) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
pTsdbFS = &fsLatest;
|
||||
}
|
||||
|
||||
// 2) save CURRENT
|
||||
if ((code = tsdbFSCommit1(pTsdb, pTsdbFS))) {
|
||||
code = tsdbFSCommit2(pTsdb, &fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// 3) apply the tsdbFS to pTsdb->fs
|
||||
if ((code = tsdbFSCommit2(pTsdb, pTsdbFS))) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _exit;
|
||||
}
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
|
||||
if (type == RETENTION_MIGRATE) {
|
||||
++nBatch;
|
||||
goto _retention_loop;
|
||||
}
|
||||
|
||||
_exit:
|
||||
tsdbFSDestroy(&fs);
|
||||
tsdbFSDestroy(&fsLatest);
|
||||
if (code != 0) {
|
||||
tsdbError("vgId:%d, tsdb do retention %" PRIi8 " failed since %s", TD_VID(pTsdb->pVnode), type, tstrerror(code));
|
||||
ASSERT(0);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Data migration between multi-tier storage, including remove expired data.
|
||||
* 1) firstly, remove expired DFileSet;
|
||||
* 2) partition the tsdbFS by the expLevel and the estimated cost(e.g. 5s) to copy, and migrate
|
||||
* DFileSet groups between multi-tier storage;
|
||||
* 3) update the tsdbFS and CURRENT in the same transaction;
|
||||
* 4) finish the migration
|
||||
* @param pTsdb
|
||||
* @param now
|
||||
* @param maxSpeed
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) {
|
||||
int32_t code = 0;
|
||||
int32_t retention = RETENTION_NO;
|
||||
|
||||
retention = tsdbShouldDoRetention(pTsdb, now);
|
||||
if (retention == RETENTION_NO) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// step 1: process expire
|
||||
code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_EXPIRED);
|
||||
if (code < 0) goto _exit;
|
||||
|
||||
// step 2: process multi-tier migration
|
||||
code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_MIGRATE);
|
||||
if (code < 0) goto _exit;
|
||||
|
||||
_exit:
|
||||
pTsdb->trimHdl.maxRetentFid = INT32_MIN;
|
||||
if (code != 0) {
|
||||
tsdbError("vgId:%d, tsdb do retention %d failed since %s, time:%" PRIi64 ", max speed:%" PRIi64,
|
||||
TD_VID(pTsdb->pVnode), retention, tstrerror(code), now, maxSpeed);
|
||||
ASSERT(0);
|
||||
// tsdbFSRollback(pTsdb->pFS);
|
||||
} else {
|
||||
tsdbInfo("vgId:%d, tsdb do retention %d succeed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pTsdb->pVnode),
|
||||
retention, now, maxSpeed);
|
||||
}
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
ASSERT(0);
|
||||
// tsdbFSRollback(pTsdb->pFS);
|
||||
return code;
|
||||
}
|
|
@ -509,24 +509,16 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
|
|||
*maxKey = *minKey + minutes * tsTickPerMin[precision] - 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief get fid level by keep and days.
|
||||
*
|
||||
* @param fid
|
||||
* @param pKeepCfg
|
||||
* @param now millisecond
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {
|
||||
int32_t aFid[3];
|
||||
TSKEY key;
|
||||
|
||||
if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
// now = now * 1000;
|
||||
now = now * 1000;
|
||||
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
now = now * 1000l;
|
||||
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
|
||||
now = now * 1000000l;
|
||||
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
|
||||
now = now * 1000000000l;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -186,17 +186,6 @@ void vnodePreClose(SVnode *pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void vnodeTrimDbClose(SVnode *pVnode) {
|
||||
int32_t nLoops = 0;
|
||||
while (atomic_load_8(&pVnode->trimDbH.state) != 0) {
|
||||
if (++nLoops > 1000) {
|
||||
vTrace("vgId:%d, wait for trimDb task to finish", TD_VID(pVnode));
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
vnodeCommit(pVnode);
|
||||
|
@ -208,7 +197,6 @@ void vnodeClose(SVnode *pVnode) {
|
|||
smaClose(pVnode->pSma);
|
||||
metaClose(pVnode->pMeta);
|
||||
vnodeCloseBufPool(pVnode);
|
||||
vnodeTrimDbClose(pVnode);
|
||||
// destroy handle
|
||||
tsem_destroy(&(pVnode->canCommit));
|
||||
tsem_destroy(&pVnode->syncSem);
|
||||
|
|
|
@ -378,7 +378,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SVTrimDbReq trimReq = {0};
|
||||
|
@ -389,7 +388,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, pVnode->config.vgId, trimReq.timestamp);
|
||||
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
||||
|
||||
// process
|
||||
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
||||
|
@ -401,91 +400,6 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
|
|||
_exit:
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
SVnode *pVnode;
|
||||
SVTrimDbReq trimReq;
|
||||
} SVndTrimDbReq;
|
||||
|
||||
void *vnodeProcessTrimReqFunc(void *param) {
|
||||
int32_t code = 0;
|
||||
int8_t oldVal = 0;
|
||||
SVndTrimDbReq *pReq = (SVndTrimDbReq *)param;
|
||||
SVnode *pVnode = pReq->pVnode;
|
||||
|
||||
setThreadName("vnode-trim");
|
||||
|
||||
// process
|
||||
code = tsdbDoRetention(pVnode->pTsdb, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed);
|
||||
if (code) goto _exit;
|
||||
|
||||
code = smaDoRetention(pVnode->pSma, pReq->trimReq.timestamp, pReq->trimReq.maxSpeed);
|
||||
if (code) goto _exit;
|
||||
_exit:
|
||||
oldVal = atomic_val_compare_exchange_8(&pVnode->trimDbH.state, 1, 0);
|
||||
ASSERT(oldVal == 1);
|
||||
if (code) {
|
||||
vError("vgId:%d, trim vnode thread failed since %s, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode),
|
||||
tstrerror(code), pReq->trimReq.timestamp, pReq->trimReq.maxSpeed);
|
||||
} else {
|
||||
vInfo("vgId:%d, trim vnode thread finish, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode),
|
||||
pReq->trimReq.timestamp, pReq->trimReq.maxSpeed);
|
||||
}
|
||||
taosMemoryFree(pReq);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SVndTrimDbReq *pVndTrimReq = taosMemoryMalloc(sizeof(SVndTrimDbReq));
|
||||
SVTrimDbHdl *pHandle = &pVnode->trimDbH;
|
||||
|
||||
if (!pVndTrimReq) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pVndTrimReq->pVnode = pVnode;
|
||||
|
||||
if (tDeserializeSVTrimDbReq(pReq, len, &pVndTrimReq->trimReq) != 0) {
|
||||
taosMemoryFree(pVndTrimReq);
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) {
|
||||
vInfo("vgId:%d, trim vnode request ignored since duplicated req, time:%" PRIi64 ", max speed:%" PRIi64,
|
||||
TD_VID(pVnode), pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed);
|
||||
taosMemoryFree(pVndTrimReq);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode),
|
||||
pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed);
|
||||
|
||||
TdThreadAttr thAttr = {0};
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED);
|
||||
|
||||
TdThread tid;
|
||||
if (taosThreadCreate(&tid, &thAttr, vnodeProcessTrimReqFunc, (void *)pVndTrimReq) != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(pVndTrimReq);
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0);
|
||||
ASSERT(oldVal == 1);
|
||||
vError("vgId:%d, failed to create pthread to trim vnode since %s", TD_VID(pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
vDebug("vgId:%d, success to create pthread to trim vnode", TD_VID(pVnode));
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
|
||||
_exit:
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||
|
@ -1007,8 +921,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
}
|
||||
|
||||
if (taosArrayGetSize(newTbUids) > 0) {
|
||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||
(int32_t)taosArrayGetSize(newTbUids));
|
||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
|
||||
}
|
||||
|
||||
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
|
||||
|
|
Loading…
Reference in New Issue