more code

This commit is contained in:
Hongze Cheng 2023-06-06 16:42:51 +08:00
parent 89a9e01375
commit e6fb46c072
3 changed files with 60 additions and 16 deletions

View File

@ -64,6 +64,7 @@ int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArr
int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row);
int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData);
int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record);
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer);
struct SSttFileWriterConfig {
STsdb *tsdb;

View File

@ -48,8 +48,10 @@ typedef struct {
// reader
TSttFileReaderArray sttReaderArr[1];
// iter
TTsdbIterArray iterArr[1];
SIterMerger *iterMerger;
TTsdbIterArray dataIterArr[1];
SIterMerger *dataIterMerger;
TTsdbIterArray tombIterArr[1];
SIterMerger *tombIterMerger;
// writer
SSttFileWriter *sttWriter;
SDataFileWriter *dataWriter;
@ -86,12 +88,12 @@ static int32_t tsdbMergerClose(SMerger *merger) {
ASSERT(merger->dataWriter == NULL);
ASSERT(merger->sttWriter == NULL);
ASSERT(merger->iterMerger == NULL);
ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
ASSERT(merger->dataIterMerger == NULL);
ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
// clear the merge
TARRAY2_FREE(merger->iterArr);
TARRAY2_FREE(merger->dataIterArr);
TARRAY2_FREE(merger->sttReaderArr);
TARRAY2_FREE(merger->fopArr);
for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
@ -196,7 +198,8 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) {
// data
for (SRowInfo *row; (row = tsdbIterMergerGet(merger->dataIterMerger)) != NULL;) {
if (row->uid != merger->ctx->tbid->uid) {
code = tsdbMergeToDataTableEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit);
@ -234,13 +237,28 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) {
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(merger->iterMerger);
code = tsdbIterMergerNext(merger->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeToDataTableEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit);
// tomb
STombRecord *record;
while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
if (tsdbSttFileWriterIsOpened(merger->sttWriter)) {
code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDataFileWriteTombRecord(merger->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbIterMergerNext(merger->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
@ -253,12 +271,23 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
// data
SRowInfo *row;
while ((row = tsdbIterMergerGet(merger->iterMerger))) {
while ((row = tsdbIterMergerGet(merger->dataIterMerger))) {
code = tsdbSttFileWriteTSData(merger->sttWriter, row);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(merger->iterMerger);
code = tsdbIterMergerNext(merger->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
// tomb
STombRecord *record;
while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(merger->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -352,15 +381,25 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
.sttReader = segReader,
}};
// data iter
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->dataIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->iterArr, iter);
// tomb iter
config->type = TSDB_ITER_TYPE_STT_TOMB;
code = tsdbIterOpen(config, &iter);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(merger->tombIterArr, iter);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger, false);
code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -464,8 +503,8 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
int32_t lino = 0;
ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
ASSERT(merger->iterMerger == NULL);
ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
ASSERT(merger->dataIterMerger == NULL);
ASSERT(merger->sttWriter == NULL);
ASSERT(merger->dataWriter == NULL);
@ -516,8 +555,10 @@ _exit:
}
static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
tsdbIterMergerClose(&merger->iterMerger);
TARRAY2_CLEAR(merger->iterArr, tsdbIterClose);
tsdbIterMergerClose(&merger->tombIterMerger);
TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
tsdbIterMergerClose(&merger->dataIterMerger);
TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
return 0;
}

View File

@ -891,3 +891,5 @@ _exit:
}
return code;
}
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }