tsdb retention function

This commit is contained in:
Hongze Cheng 2022-07-10 12:25:11 +00:00
parent 8c45b028c1
commit be1af580c2
5 changed files with 206 additions and 45 deletions

View File

@ -32,39 +32,38 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct STable STable;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile;
typedef struct STsdbCacheFile STsdbCacheFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
typedef struct SLastFile SLastFile;
typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState;
typedef struct STsdbSnapHdr STsdbSnapHdr;
typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelData SDelData;
typedef struct SDelIdx SDelIdx;
typedef struct STbData STbData;
typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct STable STable;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
typedef struct SLastFile SLastFile;
typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
typedef struct SDelFWriter SDelFWriter;
typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState;
typedef struct STsdbSnapHdr STsdbSnapHdr;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
@ -163,6 +162,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
// other
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
// tsdbMemTable ==============================================================================================
@ -200,6 +200,7 @@ int32_t tsdbFSRollback(STsdbFS *pFS);
int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
// tsdbReaderWriter.c ==============================================================================================
@ -213,6 +214,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);

View File

@ -688,6 +688,14 @@ _exit:
return code;
}
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) {
int32_t idx;
idx = taosArraySearchIdx(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
ASSERT(idx >= 0);
taosArrayRemove(pState->aDFileSet, idx);
}
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {

View File

@ -1913,3 +1913,114 @@ _err:
taosArrayDestroy(aBlockCol);
return code;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t code = 0;
int64_t n;
int64_t size;
TdFilePtr pOutFD = NULL; // TODO
TdFilePtr PInFD = NULL; // TODO
char fNameFrom[TSDB_FILENAME_LEN];
char fNameTo[TSDB_FILENAME_LEN];
// head
tsdbDataFileName(pTsdb, pSetFrom, TSDB_HEAD_FILE, fNameFrom);
tsdbDataFileName(pTsdb, pSetTo, TSDB_HEAD_FILE, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fHead.size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
// data
tsdbDataFileName(pTsdb, pSetFrom, TSDB_DATA_FILE, fNameFrom);
tsdbDataFileName(pTsdb, pSetTo, TSDB_DATA_FILE, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fData.size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
// last
tsdbDataFileName(pTsdb, pSetFrom, TSDB_LAST_FILE, fNameFrom);
tsdbDataFileName(pTsdb, pSetTo, TSDB_LAST_FILE, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fLast.size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
// sma
tsdbDataFileName(pTsdb, pSetFrom, TSDB_SMA_FILE, fNameFrom);
tsdbDataFileName(pTsdb, pSetTo, TSDB_SMA_FILE, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->fSma.size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
return code;
_err:
tsdbError("vgId:%d tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}

View File

@ -25,8 +25,37 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
// do retention
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs->nState->aDFileSet); iSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pTsdb->fs->nState->aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pDFileSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
// TODO
// check
if (expLevel == pDFileSet->fid) continue;
if (expLevel < 0) {
tsdbFSStateDeleteDFileSet(pTsdb->fs->nState, pDFileSet->fid);
iSet--;
// tsdbInfo("vgId:%d file is out of data, remove it", td);
} else {
// alloc
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
goto _err;
}
if (did.level == pDFileSet->diskId.level) continue;
ASSERT(did.level > pDFileSet->diskId.level);
// copy the file to new disk
SDFileSet nDFileSet = *pDFileSet;
nDFileSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet);
if (code) goto _err;
code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, &nDFileSet);
if (code) goto _err;
}
}
// commit
@ -38,5 +67,6 @@ _exit:
_err:
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbFSRollback(pTsdb->fs);
return code;
}

View File

@ -465,17 +465,27 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
*maxKey = *minKey + minutes * tsTickPerMin[precision] - 1;
}
// int tsdFidLevel(int fid, TSKEY now, minute) {
// if (fid >= pRtn->maxFid) {
// return 0;
// } else if (fid >= pRtn->midFid) {
// return 1;
// } else if (fid >= pRtn->minFid) {
// return 2;
// } else {
// return -1;
// }
// }
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {
int32_t aFid[3];
TSKEY key;
key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep0);
key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep1);
key = now - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->keep2);
if (fid >= aFid[0]) {
return 0;
} else if (fid >= aFid[1]) {
return 1;
} else if (fid >= aFid[2]) {
return 2;
} else {
return -1;
}
}
// TSDBROW ======================================================
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {