retention code
This commit is contained in:
parent
fe2266af26
commit
9071c650a1
|
@ -122,6 +122,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
int32_t tsdbBegin(STsdb* pTsdb);
|
int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
int32_t tsdbCommit(STsdb* pTsdb);
|
int32_t tsdbCommit(STsdb* pTsdb);
|
||||||
|
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
|
|
|
@ -15,49 +15,81 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t *canDo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
STsdbFSState *pState;
|
||||||
|
|
||||||
// begin
|
if (try) {
|
||||||
code = tsdbFSBegin(pTsdb->fs);
|
pState = pTsdb->fs->cState;
|
||||||
if (code) goto _err;
|
*canDo = 0;
|
||||||
|
} else {
|
||||||
|
pState = pTsdb->fs->nState;
|
||||||
|
}
|
||||||
|
|
||||||
// do retention
|
for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) {
|
||||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs->nState->aDFileSet); iSet++) {
|
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pState->aDFileSet, iSet);
|
||||||
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pTsdb->fs->nState->aDFileSet, iSet);
|
|
||||||
int32_t expLevel = tsdbFidLevel(pDFileSet->fid, &pTsdb->keepCfg, now);
|
int32_t expLevel = tsdbFidLevel(pDFileSet->fid, &pTsdb->keepCfg, now);
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (expLevel == pDFileSet->fid) continue;
|
if (expLevel == pDFileSet->fid) continue;
|
||||||
|
|
||||||
|
// delete or move
|
||||||
if (expLevel < 0) {
|
if (expLevel < 0) {
|
||||||
tsdbFSStateDeleteDFileSet(pTsdb->fs->nState, pDFileSet->fid);
|
if (try) {
|
||||||
iSet--;
|
*canDo = 1;
|
||||||
// tsdbInfo("vgId:%d file is out of data, remove it", td);
|
} else {
|
||||||
|
tsdbFSStateDeleteDFileSet(pState, pDFileSet->fid);
|
||||||
|
iSet--;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// alloc
|
// alloc
|
||||||
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _err;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (did.level == pDFileSet->diskId.level) continue;
|
if (did.level == pDFileSet->diskId.level) continue;
|
||||||
|
|
||||||
ASSERT(did.level > pDFileSet->diskId.level);
|
if (try) {
|
||||||
|
*canDo = 1;
|
||||||
|
} else {
|
||||||
|
// copy the file to new disk
|
||||||
|
|
||||||
// copy the file to new disk
|
SDFileSet nDFileSet = *pDFileSet;
|
||||||
SDFileSet nDFileSet = *pDFileSet;
|
nDFileSet.diskId = did;
|
||||||
nDFileSet.diskId = did;
|
|
||||||
|
|
||||||
code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet);
|
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, &nDFileSet);
|
code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet);
|
||||||
if (code) goto _err;
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
code = tsdbFSStateUpsertDFileSet(pState, &nDFileSet);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int8_t canDo;
|
||||||
|
|
||||||
|
// try
|
||||||
|
tsdbDoRetentionImpl(pTsdb, now, 1, &canDo);
|
||||||
|
if (!canDo) goto _exit;
|
||||||
|
|
||||||
|
// begin
|
||||||
|
code = tsdbFSBegin(pTsdb->fs);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// do retention
|
||||||
|
code = tsdbDoRetentionImpl(pTsdb, now, 0, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
// commit
|
// commit
|
||||||
code = tsdbFSCommit(pTsdb->fs);
|
code = tsdbFSCommit(pTsdb->fs);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
|
@ -469,6 +469,16 @@ int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {
|
||||||
int32_t aFid[3];
|
int32_t aFid[3];
|
||||||
TSKEY key;
|
TSKEY key;
|
||||||
|
|
||||||
|
if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
|
||||||
|
now = now * 1000;
|
||||||
|
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
now = now * 1000000l;
|
||||||
|
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
now = now * 1000000000l;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
|
key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
|
||||||
aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep0);
|
aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep0);
|
||||||
key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
|
key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
|
||||||
|
|
|
@ -350,20 +350,23 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
SVTrimDbReq trimReq = {0};
|
SVTrimDbReq trimReq = {0};
|
||||||
if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
|
||||||
int32_t ret = 0;
|
|
||||||
if (ret != 0) {
|
// decode
|
||||||
goto end;
|
if (tDeserializeSVTrimDbReq(pReq, len, &trimReq) != 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
// process
|
||||||
return ret;
|
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
|
Loading…
Reference in New Issue