more code
This commit is contained in:
parent
616a675682
commit
10e6e52670
|
@ -73,10 +73,10 @@ struct SSttFileWriterConfig {
|
|||
int32_t szPage;
|
||||
int8_t cmprAlg;
|
||||
int64_t compactVersion; // compact version
|
||||
STFile file;
|
||||
SSkmInfo *skmTb;
|
||||
SSkmInfo *skmRow;
|
||||
uint8_t **aBuf;
|
||||
STFile file;
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -364,7 +364,10 @@ int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger
|
|||
}
|
||||
|
||||
int32_t tsdbIterMergerClear(SIterMerger **merger) {
|
||||
if (merger[0]) {
|
||||
taosMemoryFree(merger[0]);
|
||||
merger[0] = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,27 +25,27 @@ typedef struct {
|
|||
int8_t cmprAlg;
|
||||
int64_t compactVersion;
|
||||
int64_t cid;
|
||||
SSkmInfo skmTb;
|
||||
SSkmInfo skmRow;
|
||||
uint8_t *aBuf[5];
|
||||
SSkmInfo skmTb[1];
|
||||
|
||||
// context
|
||||
struct {
|
||||
bool opened;
|
||||
|
||||
STFileSet *fset;
|
||||
bool toData;
|
||||
int32_t level;
|
||||
SSttLvl *lvl;
|
||||
STFileObj *fobj;
|
||||
SRowInfo *row;
|
||||
SBlockData bData;
|
||||
SBlockData bData[1];
|
||||
} ctx[1];
|
||||
|
||||
TFileOpArray fopArr[1];
|
||||
|
||||
// reader
|
||||
TSttFileReaderArray sttReaderArr[1];
|
||||
SDataFileReader *dataReader;
|
||||
// iter
|
||||
TTsdbIterArray iterArr[1];
|
||||
SIterMerger *iterMerger;
|
||||
TFileOpArray fopArr[1];
|
||||
// writer
|
||||
SSttFileWriter *sttWriter;
|
||||
SDataFileWriter *dataWriter;
|
||||
|
@ -63,7 +63,6 @@ static int32_t tsdbMergerOpen(SMerger *merger) {
|
|||
}
|
||||
|
||||
static int32_t tsdbMergerClose(SMerger *merger) {
|
||||
// TODO
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode *pVnode = merger->tsdb->pVnode;
|
||||
|
@ -78,81 +77,94 @@ static int32_t tsdbMergerClose(SMerger *merger) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// clear the merge
|
||||
TARRAY2_FREE(merger->iterArr);
|
||||
TARRAY2_FREE(merger->sttReaderArr);
|
||||
TARRAY2_FREE(merger->fopArr);
|
||||
tBlockDataDestroy(merger->ctx->bData);
|
||||
tDestroyTSchema(merger->skmTb->pTSchema);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
} else {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeNextRow(SMerger *merger) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) {
|
||||
if (merger->ctx->bData.nRow == 0) return 0;
|
||||
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
|
||||
if (merger->ctx->bData->nRow == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
if (merger->ctx->bData.nRow >= merger->minRow) {
|
||||
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData);
|
||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (merger->ctx->bData->nRow < merger->minRow) {
|
||||
code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData);
|
||||
code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tBlockDataReset(&merger->ctx->bData);
|
||||
tBlockDataClear(merger->ctx->bData);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
code = tsdbUpdateSkmTb(merger->tsdb, (const TABLEID *)merger->ctx->row, merger->skmTb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tBlockDataInit(merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb->pTSchema, NULL, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeToData(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
for (;;) {
|
||||
code = tsdbMergeNextRow(merger);
|
||||
while ((merger->ctx->row = tsdbIterMergerGet(merger->iterMerger))) {
|
||||
if (merger->ctx->row->uid != merger->ctx->bData->uid) {
|
||||
code = tsdbMergeToDataTableEnd(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (!merger->ctx->row) {
|
||||
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid,
|
||||
merger->ctx->row->uid)) {
|
||||
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0);
|
||||
code = tsdbMergeToDataTableBegin(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid);
|
||||
code = tBlockDataAppendRow(merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (merger->ctx->bData.nRow >= merger->maxRow) {
|
||||
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||
if (merger->ctx->bData->nRow >= merger->maxRow) {
|
||||
code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tBlockDataReset(merger->ctx->bData);
|
||||
}
|
||||
|
||||
code = tsdbIterMergerNext(merger->iterMerger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbMergeToDataTableEnd(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -162,13 +174,12 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
|
|||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
for (;;) {
|
||||
code = tsdbMergeNextRow(merger);
|
||||
SRowInfo *row;
|
||||
while ((row = tsdbIterMergerGet(merger->iterMerger))) {
|
||||
code = tsdbSttFileWriteTSData(merger->sttWriter, row);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (!merger->ctx->row) break;
|
||||
|
||||
code = tsdbSttFileWriteTSData(merger->sttWriter, merger->ctx->row);
|
||||
code = tsdbIterMergerNext(merger->iterMerger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -179,97 +190,185 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
|
||||
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
STFileSet *fset = merger->ctx->fset;
|
||||
|
||||
// prepare the merger file set
|
||||
SSttLvl *lvl;
|
||||
STFileObj *fobj;
|
||||
merger->ctx->toData = true;
|
||||
merger->ctx->level = 0;
|
||||
|
||||
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||
if (lvl->level != merger->ctx->level) {
|
||||
lvl = NULL;
|
||||
TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {
|
||||
if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
|
||||
merger->ctx->toData = false;
|
||||
merger->ctx->lvl = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
fobj = TARRAY2_GET(lvl->fobjArr, 0);
|
||||
if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
|
||||
ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);
|
||||
|
||||
merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
|
||||
if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
|
||||
merger->ctx->toData = false;
|
||||
break;
|
||||
} else {
|
||||
ASSERT(lvl->level == 0 || TARRAY2_SIZE(lvl->fobjArr) == 1);
|
||||
merger->ctx->level++;
|
||||
|
||||
// add the operation
|
||||
STFileOp op = {
|
||||
.optype = TSDB_FOP_REMOVE,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.of = merger->ctx->fobj->f[0],
|
||||
};
|
||||
code = TARRAY2_APPEND(merger->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// open the reader
|
||||
SSttFileReader *reader;
|
||||
SSttFileReaderConfig config = {
|
||||
SSttFileReaderConfig config[1] = {{
|
||||
.tsdb = merger->tsdb,
|
||||
// TODO
|
||||
};
|
||||
code = tsdbSttFReaderOpen(fobj->fname, &config, &reader);
|
||||
.szPage = merger->szPage,
|
||||
.file[0] = merger->ctx->fobj->f[0],
|
||||
}};
|
||||
code = tsdbSttFReaderOpen(merger->ctx->fobj->fname, config, &reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = TARRAY2_APPEND(merger->sttReaderArr, reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
// add the operation
|
||||
STFileOp op = {
|
||||
.fid = fobj->f->fid,
|
||||
.optype = TSDB_FOP_REMOVE,
|
||||
.of = fobj->f[0],
|
||||
};
|
||||
code = TARRAY2_APPEND(merger->fopArr, op);
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
SSttFileReader *sttReader;
|
||||
TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
|
||||
const TSttSegReaderArray *segReaderArr;
|
||||
|
||||
code = tsdbSttFReaderGetSegReader(sttReader, &segReaderArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
SSttSegReader *segReader;
|
||||
TARRAY2_FOREACH(segReaderArr, segReader) {
|
||||
STsdbIter *iter;
|
||||
|
||||
STsdbIterConfig config[1] = {{
|
||||
.type = TSDB_ITER_TYPE_STT,
|
||||
.sttReader = segReader,
|
||||
}};
|
||||
|
||||
code = tsdbIterOpen(config, &iter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = TARRAY2_APPEND(merger->iterArr, iter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
// open stt file writer
|
||||
if (lvl) {
|
||||
SSttFileWriterConfig config = {
|
||||
.tsdb = merger->tsdb,
|
||||
.maxRow = merger->maxRow,
|
||||
.szPage = merger->szPage,
|
||||
.cmprAlg = merger->cmprAlg,
|
||||
.skmTb = &merger->skmTb,
|
||||
.skmRow = &merger->skmRow,
|
||||
.aBuf = merger->aBuf,
|
||||
.file = fobj->f[0],
|
||||
};
|
||||
code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
|
||||
code = tsdbIterMergerInit(merger->iterArr, &merger->iterMerger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
SSttFileWriterConfig config = {
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
SDiskID did = {
|
||||
.level = 0,
|
||||
.id = 0,
|
||||
}; // TODO
|
||||
|
||||
if (merger->ctx->lvl) { // to existing level
|
||||
SSttFileWriterConfig config[1] = {{
|
||||
.tsdb = merger->tsdb,
|
||||
.maxRow = merger->maxRow,
|
||||
.szPage = merger->szPage,
|
||||
.cmprAlg = merger->cmprAlg,
|
||||
.skmTb = &merger->skmTb,
|
||||
.skmRow = &merger->skmRow,
|
||||
.aBuf = merger->aBuf,
|
||||
.compactVersion = merger->compactVersion,
|
||||
.file = merger->ctx->fobj->f[0],
|
||||
}};
|
||||
code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else { // to new level
|
||||
SSttFileWriterConfig config[1] = {{
|
||||
.tsdb = merger->tsdb,
|
||||
.maxRow = merger->maxRow,
|
||||
.szPage = merger->szPage,
|
||||
.cmprAlg = merger->cmprAlg,
|
||||
.compactVersion = merger->compactVersion,
|
||||
.file =
|
||||
(STFile){
|
||||
{
|
||||
.type = TSDB_FTYPE_STT,
|
||||
.did = {.level = 0, .id = 0},
|
||||
.fid = fset->fid,
|
||||
.did = did,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.size = 0,
|
||||
.stt = {{.level = merger->ctx->level, .nseg = 0}},
|
||||
.stt = {{
|
||||
.level = merger->ctx->level,
|
||||
.nseg = 0,
|
||||
}},
|
||||
},
|
||||
};
|
||||
code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
|
||||
}};
|
||||
code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// open data file writer
|
||||
if (merger->ctx->toData) {
|
||||
if (merger->ctx->toData) { // TODO
|
||||
tBlockDataReset(merger->ctx->bData);
|
||||
|
||||
SDataFileWriterConfig config = {
|
||||
.tsdb = merger->tsdb,
|
||||
// TODO
|
||||
.maxRow = merger->maxRow,
|
||||
.f =
|
||||
{
|
||||
[0] =
|
||||
{
|
||||
.type = TSDB_FTYPE_HEAD,
|
||||
.did = did,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.size = 0,
|
||||
},
|
||||
[1] =
|
||||
{
|
||||
.type = TSDB_FTYPE_DATA,
|
||||
.did = did,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.size = 0,
|
||||
},
|
||||
[2] =
|
||||
{
|
||||
.type = TSDB_FTYPE_SMA,
|
||||
.did = did,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.size = 0,
|
||||
},
|
||||
[3] =
|
||||
{
|
||||
.type = TSDB_FTYPE_TOMB,
|
||||
.did = did,
|
||||
.fid = merger->ctx->fset->fid,
|
||||
.cid = merger->cid,
|
||||
.size = 0,
|
||||
},
|
||||
},
|
||||
};
|
||||
code = tsdbDataFileWriterOpen(&config, &merger->dataWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -277,46 +376,110 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
|
||||
ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
|
||||
ASSERT(merger->iterMerger == NULL);
|
||||
ASSERT(merger->sttWriter == NULL);
|
||||
ASSERT(merger->dataWriter == NULL);
|
||||
|
||||
// open reader
|
||||
code = tsdbMergeFileSetBeginOpenReader(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// open iterator
|
||||
code = tsdbMergeFileSetBeginOpenIter(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// open writer
|
||||
code = tsdbMergeFileSetBeginOpenWriter(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
STFileOp op[1];
|
||||
|
||||
if (merger->ctx->toData) {
|
||||
code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (op->optype != TSDB_FOP_NONE) {
|
||||
code = TARRAY2_APPEND_PTR(merger->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbSttFileWriterClose(&merger->sttWriter, 0, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (op->optype != TSDB_FOP_NONE) {
|
||||
code = TARRAY2_APPEND_PTR(merger->fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
|
||||
tsdbIterMergerClear(&merger->iterMerger);
|
||||
TARRAY2_CLEAR(merger->iterArr, tsdbIterClose);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
|
||||
TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFReaderClose);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||
|
||||
STFileOp op;
|
||||
code = tsdbSttFileWriterClose(&merger->sttWriter, 0, &op);
|
||||
code = tsdbMergeFileSetEndCloseWriter(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (op.optype != TSDB_FOP_NONE) {
|
||||
code = TARRAY2_APPEND(merger->fopArr, op);
|
||||
code = tsdbMergeFileSetEndCloseIter(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (merger->ctx->toData) {
|
||||
// code = tsdbDataFWriterClose();
|
||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
code = tsdbMergeFileSetEndCloseReader(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
|
||||
TSDB_ERROR_LOG(vid, lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (!merger->ctx->opened) {
|
||||
code = tsdbMergerOpen(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
merger->ctx->fset = fset;
|
||||
|
||||
code = tsdbMergeFileSetBegin(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
|
@ -350,12 +513,17 @@ static int32_t tsdbDoMerge(SMerger *merger) {
|
|||
SSttLvl *lvl;
|
||||
STFileObj *fobj;
|
||||
TARRAY2_FOREACH(merger->fsetArr, fset) {
|
||||
lvl = TARRAY2_SIZE(fset->lvlArr) ? TARRAY2_FIRST(fset->lvlArr) : NULL;
|
||||
lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
|
||||
if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
|
||||
|
||||
fobj = TARRAY2_FIRST(lvl->fobjArr);
|
||||
if (fobj->f->stt->nseg < merger->sttTrigger) continue;
|
||||
|
||||
if (!merger->ctx->opened) {
|
||||
code = tsdbMergerOpen(merger);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbMergeFileSet(merger, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue