adjust api code
This commit is contained in:
parent
b2bfbebfb1
commit
d537b2f8b6
|
@ -1083,8 +1083,8 @@ void tsdbRemoveFile(const char *path);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
int32_t tsdbAllocateDisk(STsdb *tsdb, int32_t fid, const char *label, SDiskID *diskId);
|
||||
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t fid, int32_t level, const char *label, SDiskID *diskId);
|
||||
int32_t tsdbAllocateDisk(STsdb *tsdb, const char *label, int32_t expLevel, SDiskID *diskId);
|
||||
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t level, const char *label, SDiskID *diskId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ typedef struct {
|
|||
struct {
|
||||
SFileSetCommitInfo *info;
|
||||
|
||||
int32_t expLevel;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
TABLEID tbid[1];
|
||||
|
@ -73,6 +74,7 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
|
|||
.cmprAlg = committer->cmprAlg,
|
||||
.fid = committer->ctx->info->fid,
|
||||
.cid = committer->cid,
|
||||
.expLevel = committer->ctx->expLevel,
|
||||
.level = 0,
|
||||
};
|
||||
|
||||
|
@ -321,6 +323,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
// check if can commit
|
||||
tsdbFSCheckCommit(tsdb, committer->ctx->info->fid);
|
||||
|
||||
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now);
|
||||
tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
|
||||
&committer->ctx->maxKey);
|
||||
|
||||
|
@ -335,8 +338,9 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64, TD_VID(tsdb->pVnode), __func__,
|
||||
committer->ctx->info->fid, committer->ctx->minKey, committer->ctx->maxKey);
|
||||
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, committer->ctx->info->fid, committer->ctx->minKey, committer->ctx->maxKey,
|
||||
committer->ctx->expLevel);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -98,8 +98,6 @@ typedef struct SDataFileRAWWriter {
|
|||
STsdbFD *fd;
|
||||
} SDataFileRAWWriter;
|
||||
|
||||
typedef struct SDataFileRAWWriter SDataFileRAWWriter;
|
||||
|
||||
int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer);
|
||||
int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr);
|
||||
|
||||
|
|
|
@ -751,7 +751,7 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
|
||||
// .head
|
||||
ftype = TSDB_FTYPE_HEAD;
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, writer->config->fid, tsdbFTypeLabel(ftype), &diskId);
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype] = (STFile){
|
||||
.type = ftype,
|
||||
|
@ -768,7 +768,7 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
if (writer->config->files[ftype].exist) {
|
||||
writer->files[ftype] = writer->config->files[ftype].file;
|
||||
} else {
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, writer->config->fid, tsdbFTypeLabel(ftype), &diskId);
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype] = (STFile){
|
||||
.type = ftype,
|
||||
|
@ -787,7 +787,7 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
if (writer->config->files[ftype].exist) {
|
||||
writer->files[ftype] = writer->config->files[ftype].file;
|
||||
} else {
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, writer->config->fid, tsdbFTypeLabel(ftype), &diskId);
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype] = (STFile){
|
||||
.type = ftype,
|
||||
|
@ -802,7 +802,7 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
|
|||
|
||||
// .tomb
|
||||
ftype = TSDB_FTYPE_TOMB;
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, writer->config->fid, tsdbFTypeLabel(ftype), &diskId);
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(ftype), writer->config->expLevel, &diskId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype] = (STFile){
|
||||
.type = ftype,
|
||||
|
|
|
@ -75,6 +75,7 @@ typedef struct SDataFileWriterConfig {
|
|||
int32_t szPage;
|
||||
int32_t fid;
|
||||
int64_t cid;
|
||||
int32_t expLevel;
|
||||
int64_t compactVersion;
|
||||
int32_t lcn;
|
||||
struct {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "tsdbFS2.h"
|
||||
|
||||
// SFSetRAWWriter ==================================================
|
||||
typedef struct SFSetRAWWriter {
|
||||
struct SFSetRAWWriter {
|
||||
SFSetRAWWriterConfig config[1];
|
||||
|
||||
struct {
|
||||
|
@ -28,7 +28,7 @@ typedef struct SFSetRAWWriter {
|
|||
|
||||
// writer
|
||||
SDataFileRAWWriter *dataWriter;
|
||||
} SFSetRAWWriter;
|
||||
};
|
||||
|
||||
int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer) {
|
||||
int32_t code = 0;
|
||||
|
@ -75,7 +75,7 @@ static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRA
|
|||
int32_t lino = 0;
|
||||
|
||||
SDiskID diskID = {0};
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, bHdr->file.fid, tsdbFTypeLabel(bHdr->file.type), &diskID);
|
||||
code = tsdbAllocateDisk(writer->config->tsdb, tsdbFTypeLabel(bHdr->file.type), writer->config->expLevel, &diskID);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
SDataFileRAWWriterConfig config = {
|
||||
|
|
|
@ -26,6 +26,7 @@ typedef struct SFSetRAWWriterConfig {
|
|||
STsdb *tsdb;
|
||||
int32_t szPage;
|
||||
|
||||
int32_t expLevel;
|
||||
int64_t fid;
|
||||
int64_t cid;
|
||||
int32_t level;
|
||||
|
|
|
@ -155,6 +155,7 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
|
|||
.szPage = config->szPage,
|
||||
.fid = config->fid,
|
||||
.cid = config->cid,
|
||||
.expLevel = config->expLevel,
|
||||
.compactVersion = config->compactVersion,
|
||||
.skmTb = writer[0]->skmTb,
|
||||
.skmRow = writer[0]->skmRow,
|
||||
|
@ -177,12 +178,14 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
|
|||
.szPage = config->szPage,
|
||||
.cmprAlg = config->cmprAlg,
|
||||
.compactVersion = config->compactVersion,
|
||||
.expLevel = config->expLevel,
|
||||
.fid = config->fid,
|
||||
.cid = config->cid,
|
||||
.level = config->level,
|
||||
.skmTb = writer[0]->skmTb,
|
||||
.skmRow = writer[0]->skmRow,
|
||||
.buffers = writer[0]->buffers,
|
||||
|
||||
};
|
||||
code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct {
|
|||
int8_t cmprAlg;
|
||||
int32_t fid;
|
||||
int64_t cid;
|
||||
int32_t expLevel;
|
||||
int32_t level;
|
||||
int32_t lcn;
|
||||
struct {
|
||||
|
|
|
@ -275,6 +275,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
|
|||
.cmprAlg = merger->cmprAlg,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.expLevel = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now),
|
||||
.level = merger->ctx->level,
|
||||
};
|
||||
|
||||
|
|
|
@ -240,7 +240,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, STFileObj *fobj) {
|
||||
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
|
@ -248,7 +248,6 @@ static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, STFileObj *fobj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t expLevel = tsdbFidLevel(fobj->f->fid, &rtner->tsdb->keepCfg, rtner->now);
|
||||
if (expLevel < 0) {
|
||||
// remove the file
|
||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
||||
|
@ -257,7 +256,7 @@ static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, STFileObj *fobj) {
|
|||
// Try to move the file to a new level
|
||||
SDiskID diskId = {0};
|
||||
|
||||
code = tsdbAllocateDiskAtLevel(rtner->tsdb, fobj->f->fid, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
|
||||
code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
|
||||
if (code) {
|
||||
tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
|
||||
fobj->fname, expLevel, tstrerror(code));
|
||||
|
@ -289,8 +288,9 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
|
|||
STFileSet *fset = rtner->fset;
|
||||
|
||||
// handle data file sets
|
||||
int32_t expLevel = tsdbFidLevel(fobj->f->fid, &rtner->tsdb->keepCfg, rtner->now);
|
||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
code = tsdbRemoveOrMoveFileObject(rtner, fset->farr[ftype]);
|
||||
code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -298,7 +298,7 @@ static int32_t tsdbDoRetention(SRTNer *rtner) {
|
|||
SSttLvl *lvl;
|
||||
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||
code = tsdbRemoveOrMoveFileObject(rtner, fobj);
|
||||
code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -544,6 +544,7 @@ struct STsdbSnapWriter {
|
|||
bool fsetWriteBegin;
|
||||
int32_t fid;
|
||||
STFileSet* fset;
|
||||
int32_t expLevel;
|
||||
bool hasData; // if have time series data
|
||||
bool hasTomb; // if have tomb data
|
||||
|
||||
|
@ -799,6 +800,7 @@ static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
|
|||
.cmprAlg = writer->cmprAlg,
|
||||
.fid = writer->ctx->fid,
|
||||
.cid = writer->commitID,
|
||||
.expLevel = writer->ctx->expLevel,
|
||||
.level = writer->ctx->toSttOnly ? 1 : 0,
|
||||
};
|
||||
// merge stt files to either data or a new stt file
|
||||
|
@ -835,6 +837,8 @@ static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
|
|||
STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
||||
|
||||
writer->ctx->expLevel = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||
|
||||
writer->ctx->hasData = true;
|
||||
writer->ctx->hasTomb = true;
|
||||
|
||||
|
|
|
@ -370,6 +370,7 @@ static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) {
|
|||
.szPage = writer->szPage,
|
||||
.fid = writer->ctx->fid,
|
||||
.cid = writer->commitID,
|
||||
.expLevel = writer->ctx->level,
|
||||
.level = writer->ctx->level,
|
||||
};
|
||||
|
||||
|
@ -397,6 +398,8 @@ static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t
|
|||
STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
||||
|
||||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||
|
||||
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
|
|
|
@ -793,7 +793,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
|
|||
|
||||
// alloc disk id
|
||||
SDiskID diskId = {0};
|
||||
code = tsdbAllocateDisk(tsdb, writer->config->fid, "stt", &diskId);
|
||||
code = tsdbAllocateDisk(tsdb, tsdbFTypeLabel(TSDB_FTYPE_STT), writer->config->expLevel, &diskId);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->file[0] = (STFile){
|
||||
|
|
|
@ -82,6 +82,7 @@ struct SSttFileWriterConfig {
|
|||
int32_t szPage;
|
||||
int8_t cmprAlg;
|
||||
int64_t compactVersion;
|
||||
int32_t expLevel;
|
||||
int32_t fid;
|
||||
int64_t cid;
|
||||
int32_t level;
|
||||
|
|
|
@ -1804,14 +1804,13 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbAllocateDisk(STsdb *tsdb, int32_t fid, const char *label, SDiskID *diskId) {
|
||||
int32_t tsdbAllocateDisk(STsdb *tsdb, const char *label, int32_t expLevel, SDiskID *diskId) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SDiskID did = {0};
|
||||
STfs *tfs = tsdb->pVnode->pTfs;
|
||||
|
||||
int32_t expectedLevel = tsdbFidLevel(fid, &tsdb->keepCfg, taosGetTimestampSec());
|
||||
code = tfsAllocDisk(tfs, expectedLevel, label, &did);
|
||||
code = tfsAllocDisk(tfs, expLevel, label, &did);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
|
@ -1829,12 +1828,10 @@ int32_t tsdbAllocateDisk(STsdb *tsdb, int32_t fid, const char *label, SDiskID *d
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t fid, int32_t level, const char *label, SDiskID *diskId) {
|
||||
int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t level, const char *label, SDiskID *diskId) {
|
||||
int32_t code = 0;
|
||||
SDiskID did = {
|
||||
.level = level,
|
||||
};
|
||||
STfs *tfs = tsdb->pVnode->pTfs;
|
||||
SDiskID did = {0};
|
||||
STfs *tfs = tsdb->pVnode->pTfs;
|
||||
|
||||
code = tfsAllocDiskAtLevel(tfs, level, label, &did);
|
||||
if (code) {
|
||||
|
|
|
@ -152,18 +152,6 @@ int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level) {
|
|||
|
||||
int32_t tfsGetLevel(STfs *pTfs) { return pTfs->nlevel; }
|
||||
|
||||
int32_t tfsAllocDiskAtLevel(STfs *pTfs, int32_t level, const char *label, SDiskID *pDiskId) {
|
||||
pDiskId->level = level;
|
||||
pDiskId->id = -1;
|
||||
|
||||
pDiskId->id = tfsAllocDiskOnTier(&pTfs->tiers[pDiskId->level], label);
|
||||
if (pDiskId->id < 0) {
|
||||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||
}
|
||||
|
||||
TAOS_RETURN(0);
|
||||
}
|
||||
|
||||
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, const char *label, SDiskID *pDiskId) {
|
||||
if (expLevel >= pTfs->nlevel || expLevel < 0) {
|
||||
expLevel = pTfs->nlevel - 1;
|
||||
|
@ -178,6 +166,22 @@ int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, const char *label, SDiskID *p
|
|||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||
}
|
||||
|
||||
int32_t tfsAllocDiskAtLevel(STfs *pTfs, int32_t level, const char *label, SDiskID *pDiskId) {
|
||||
if (level < 0 || level >= pTfs->nlevel) {
|
||||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||
}
|
||||
|
||||
pDiskId->level = level;
|
||||
pDiskId->id = -1;
|
||||
|
||||
pDiskId->id = tfsAllocDiskOnTier(&pTfs->tiers[pDiskId->level], label);
|
||||
if (pDiskId->id < 0) {
|
||||
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
|
||||
}
|
||||
|
||||
TAOS_RETURN(0);
|
||||
}
|
||||
|
||||
const char *tfsGetPrimaryPath(STfs *pTfs) { return TFS_PRIMARY_DISK(pTfs)->path; }
|
||||
|
||||
const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId) { return TFS_DISK_AT(pTfs, diskId)->path; }
|
||||
|
|
Loading…
Reference in New Issue