more code

This commit is contained in:
Hongze Cheng 2023-06-02 18:42:15 +08:00
parent a2dafefe03
commit 632c6fb4dd
6 changed files with 73 additions and 67 deletions

View File

@ -68,7 +68,7 @@ typedef struct SDataFileWriterConfig {
} SDataFileWriterConfig; } SDataFileWriterConfig;
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer);
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]); int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr);
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row); int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row);
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData);
int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer); int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer);

View File

@ -60,7 +60,7 @@ typedef struct SSttFileWriter SSttFileWriter;
typedef struct SSttFileWriterConfig SSttFileWriterConfig; typedef struct SSttFileWriterConfig SSttFileWriterConfig;
int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer); int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer);
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op); int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray);
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row);
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData);
int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData); int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData);

View File

@ -299,15 +299,9 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STFileOp op[1]; code = tsdbSttFileWriterClose(&committer->sttWriter, 0, committer->fopArray);
code = tsdbSttFileWriterClose(&committer->sttWriter, 0, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (op->optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND_PTR(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdbIterMergerClose(&committer->iterMerger); tsdbIterMergerClose(&committer->iterMerger);
TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);

View File

@ -299,13 +299,13 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader); code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
break; break;
} }
} }
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
@ -709,7 +709,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, STFileOp *op) { static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArray *opArr) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
TABLEID tbid[1] = {{INT64_MAX, INT64_MAX}}; TABLEID tbid[1] = {{INT64_MAX, INT64_MAX}};
@ -726,64 +726,81 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, STFileOp *
code = tsdbDataFileWriteFooter(writer); code = tsdbDataFileWriteFooter(writer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
STFileOp op;
int32_t ftype;
// .head // .head
int32_t ftype = TSDB_FTYPE_HEAD; ftype = TSDB_FTYPE_HEAD;
op[ftype] = (STFileOp){ if (writer->config->files[ftype].exist) {
op = (STFileOp){
.optype = TSDB_FOP_REMOVE,
.fid = writer->config->fid,
.of = writer->config->files[ftype].file,
};
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
op = (STFileOp){
.optype = TSDB_FOP_CREATE, .optype = TSDB_FOP_CREATE,
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// .data // .data
ftype = TSDB_FTYPE_DATA; ftype = TSDB_FTYPE_DATA;
if (writer->fd[ftype]) { if (writer->fd[ftype]) {
if (!writer->config->files[ftype].exist) { if (!writer->config->files[ftype].exist) {
op[ftype] = (STFileOp){ op = (STFileOp){
.optype = TSDB_FOP_CREATE, .optype = TSDB_FOP_CREATE,
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
} else if (writer->config->files[ftype].file.size == writer->files[ftype].size) {
op[ftype].optype = TSDB_FOP_NONE; code = TARRAY2_APPEND(opArr, op);
} else { TSDB_CHECK_CODE(code, lino, _exit);
op[ftype] = (STFileOp){ } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
op = (STFileOp){
.optype = TSDB_FOP_MODIFY, .optype = TSDB_FOP_MODIFY,
.fid = writer->config->fid, .fid = writer->config->fid,
.of = writer->config->files[ftype].file, .of = writer->config->files[ftype].file,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} }
} else {
op[ftype].optype = TSDB_FOP_NONE;
} }
// .sma // .sma
ftype = TSDB_FTYPE_SMA; ftype = TSDB_FTYPE_SMA;
if (writer->fd[ftype]) { if (writer->fd[ftype]) {
if (!writer->config->files[ftype].exist) { if (!writer->config->files[ftype].exist) {
op[ftype] = (STFileOp){ op = (STFileOp){
.optype = TSDB_FOP_CREATE, .optype = TSDB_FOP_CREATE,
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
} else if (writer->config->files[ftype].file.size == writer->files[ftype].size) {
op[ftype].optype = TSDB_FOP_NONE; code = TARRAY2_APPEND(opArr, op);
} else { TSDB_CHECK_CODE(code, lino, _exit);
op[ftype] = (STFileOp){ } else if (writer->config->files[ftype].file.size != writer->files[ftype].size) {
op = (STFileOp){
.optype = TSDB_FOP_MODIFY, .optype = TSDB_FOP_MODIFY,
.fid = writer->config->fid, .fid = writer->config->fid,
.of = writer->config->files[ftype].file, .of = writer->config->files[ftype].file,
.nf = writer->files[ftype], .nf = writer->files[ftype],
}; };
code = TARRAY2_APPEND(opArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} }
} else {
op[ftype].optype = TSDB_FOP_NONE;
} }
// .tomb // .tomb (TODO)
op[TSDB_FTYPE_TOMB] = (STFileOp){ ftype = TSDB_FTYPE_TOMB;
.optype = TSDB_FOP_NONE,
};
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (!writer->fd[i]) continue; if (!writer->fd[i]) continue;
@ -807,25 +824,21 @@ int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWri
return 0; return 0;
} }
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) { int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
if (!writer[0]->ctx->opened) { if (writer[0]->ctx->opened) {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
op[i].optype = TSDB_FOP_NONE;
}
} else {
if (abort) { if (abort) {
code = tsdbDataFileWriterCloseAbort(writer[0]); code = tsdbDataFileWriterCloseAbort(writer[0]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbDataFileWriterCloseCommit(writer[0], op); code = tsdbDataFileWriterCloseCommit(writer[0], opArr);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tsdbDataFileWriterDoClose(writer[0]); tsdbDataFileWriterDoClose(writer[0]);
} }
taosMemoryFree(writer); taosMemoryFree(writer[0]);
writer[0] = NULL; writer[0] = NULL;
_exit: _exit:

View File

@ -270,7 +270,16 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
merger->ctx->toData = true; merger->ctx->toData = true;
merger->ctx->level = 0; merger->ctx->level = 0;
TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {
// TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {
for (int32_t i = 0;; ++i) {
if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
merger->ctx->lvl = NULL;
break;
}
merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) { if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
merger->ctx->toData = false; merger->ctx->toData = false;
merger->ctx->lvl = NULL; merger->ctx->lvl = NULL;
@ -486,25 +495,12 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
int32_t lino = 0; int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode); int32_t vid = TD_VID(merger->tsdb->pVnode);
STFileOp op[TSDB_FTYPE_MAX]; code = tsdbSttFileWriterClose(&merger->sttWriter, 0, merger->fopArr);
code = tsdbSttFileWriterClose(&merger->sttWriter, 0, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (op->optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND_PTR(merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx->toData) { if (merger->ctx->toData) {
// TODO code = tsdbDataFileWriterClose(&merger->dataWriter, 0, merger->fopArr);
code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (op->optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND_PTR(merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
_exit: _exit:

View File

@ -621,7 +621,7 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
return 0; return 0;
} }
static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) {
int32_t lino; int32_t lino;
int32_t code; int32_t code;
int32_t vid = TD_VID(writer->config->tsdb->pVnode); int32_t vid = TD_VID(writer->config->tsdb->pVnode);
@ -656,10 +656,15 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) {
tsdbCloseFile(&writer->fd); tsdbCloseFile(&writer->fd);
ASSERT(writer->config->file.size < writer->file->size); ASSERT(writer->config->file.size < writer->file->size);
op->optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE; STFileOp op = {
op->fid = writer->config->file.fid; .optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE,
op->of = writer->config->file; .fid = writer->config->file.fid,
op->nf = writer->file[0]; .of = writer->config->file,
.nf = writer->file[0],
};
code = TARRAY2_APPEND(opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
@ -694,19 +699,17 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter
return 0; return 0;
} }
int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op) { int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode); int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode);
if (!writer[0]->ctx->opened) { if (writer[0]->ctx->opened) {
if (op) op->optype = TSDB_FOP_NONE;
} else {
if (abort) { if (abort) {
code = tsdbSttFWriterCloseAbort(writer[0]); code = tsdbSttFWriterCloseAbort(writer[0]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbSttFWriterCloseCommit(writer[0], op); code = tsdbSttFWriterCloseCommit(writer[0], opArray);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tsdbSttFWriterDoClose(writer[0]); tsdbSttFWriterDoClose(writer[0]);