From a2dafefe0321fcfc070e24c04fece88f840c75a8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 2 Jun 2023 17:36:14 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/inc/tsdbDataFileRW.h | 33 +- .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 446 ++++++++++++++---- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 65 +-- 3 files changed, 406 insertions(+), 138 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 001d749e25..35bbdddcda 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -32,31 +32,36 @@ typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray; typedef struct SDataFileReader SDataFileReader; typedef struct SDataFileReaderConfig { STsdb *tsdb; - STFile f[TSDB_FTYPE_MAX]; int32_t szPage; + struct { + bool exist; + STFile file; + } files[TSDB_FTYPE_MAX]; + uint8_t **bufArr; } SDataFileReaderConfig; int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config, SDataFileReader **reader); -int32_t tsdbDataFileReaderClose(SDataFileReader *reader); +int32_t tsdbDataFileReaderClose(SDataFileReader **reader); int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray); int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray); - int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData); -int32_t tsdbDataFileReadDelData(SDataFileReader *reader, const SDelBlk *delBlk, SDelData *dData); // SDataFileWriter ============================================= typedef struct SDataFileWriter SDataFileWriter; typedef struct SDataFileWriterConfig { - STsdb *tsdb; - bool hasOldFile; - STFile of[TSDB_FTYPE_MAX]; - int8_t cmprAlg; - int32_t maxRow; - int32_t szPage; - int64_t cid; - SDiskID did[1]; - int64_t compactVersion; + STsdb *tsdb; + int8_t cmprAlg; + int32_t maxRow; + int32_t szPage; + int32_t fid; + int64_t cid; + SDiskID did; + int64_t compactVersion; + struct { + bool exist; + STFile file; + } files[TSDB_FTYPE_MAX]; SSkmInfo *skmTb; SSkmInfo *skmRow; uint8_t **bufArr; @@ -66,7 +71,7 @@ int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWri int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]); int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row); int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); -int32_t tsdbDataFileFLushTSDataBlock(SDataFileWriter *writer); +int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index c237b01ac2..3d89647d56 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -24,66 +24,197 @@ typedef struct { struct SDataFileReader { struct SDataFileReaderConfig config[1]; + uint8_t *bufArr[5]; + struct { - bool blockIdxLoaded; + bool footerLoaded; + bool blockIdxLoaded; + TABLEID tbid[1]; } ctx[1]; - STsdbFD *fd[TSDB_FTYPE_MAX]; + STsdbFD *fd[TSDB_FTYPE_MAX]; + + SDataFooter footer[1]; TBlockIdxArray blockIdxArray[1]; + TDataBlkArray dataBlkArray[1]; }; +static int32_t tsdbDataFileReadFooter(SDataFileReader *reader) { + if (!reader->config->files[TSDB_FTYPE_HEAD].exist // + || reader->ctx->footerLoaded) { + return 0; + } + + int32_t code = 0; + int32_t lino = 0; + + code = + tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->config->files[TSDB_FTYPE_HEAD].file.size - sizeof(SDataFooter), + (uint8_t *)reader->footer, sizeof(SDataFooter)); + TSDB_CHECK_CODE(code, lino, _exit); + reader->ctx->footerLoaded = true; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig *config, SDataFileReader **reader) { int32_t code = 0; - int32_t lino; - int32_t vid = TD_VID(config->tsdb->pVnode); + int32_t lino = 0; - reader[0] = taosMemoryCalloc(1, sizeof(SDataFileReader)); + reader[0] = taosMemoryCalloc(1, sizeof(**reader)); if (!reader[0]) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } reader[0]->config[0] = config[0]; + if (!reader[0]->config->bufArr) { + reader[0]->config->bufArr = reader[0]->bufArr; + } + + if (fname) { + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + if (fname[i] == NULL) continue; - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - if (fname[i]) { code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } + } else { + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + if (!config->files[i].exist) continue; + + char fname1[TSDB_FILENAME_LEN]; + tsdbTFileName(config->tsdb, &config->files[i].file, fname1); + code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + TSDB_CHECK_CODE(code, lino, _exit); + } } - // TODO - _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); } return code; } -int32_t tsdbDataFileReaderClose(SDataFileReader *reader) { - // TODO +int32_t tsdbDataFileReaderClose(SDataFileReader **reader) { + if (reader[0] == NULL) return 0; + + TARRAY2_FREE(reader[0]->dataBlkArray); + TARRAY2_FREE(reader[0]->blockIdxArray); + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + tsdbCloseFile(&reader[0]->fd[i]); + } + + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) { + tFree(reader[0]->bufArr[i]); + } + taosMemoryFree(reader[0]); + reader[0] = NULL; + return 0; } int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbDataFileReadFooter(reader); + TSDB_CHECK_CODE(code, lino, _exit); + if (!reader->ctx->blockIdxLoaded) { - // TODO + TARRAY2_CLEAR(reader->blockIdxArray, NULL); + + if (reader->config->files[TSDB_FTYPE_HEAD].exist // + && reader->footer->blockIdxPtr->size) { + code = tRealloc(&reader->config->bufArr[0], reader->footer->blockIdxPtr->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->footer->blockIdxPtr->offset, reader->config->bufArr[0], + reader->footer->blockIdxPtr->size); + TSDB_CHECK_CODE(code, lino, _exit); + + int32_t size = reader->footer->blockIdxPtr->size / sizeof(SBlockIdx); + for (int32_t i = 0; i < size; ++i) { + code = TARRAY2_APPEND_PTR(reader->blockIdxArray, ((SBlockIdx *)reader->config->bufArr[0]) + i); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + reader->ctx->blockIdxLoaded = true; } + blockIdxArray[0] = reader->blockIdxArray; - return 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; } int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray) { - // TODO - return 0; + ASSERT(reader->ctx->footerLoaded); + + if (reader->ctx->tbid->suid == blockIdx->suid && reader->ctx->tbid->uid == blockIdx->uid) { + dataBlkArray[0] = reader->dataBlkArray; + return 0; + } + + int32_t code = 0; + int32_t lino = 0; + + reader->ctx->tbid->suid = blockIdx->suid; + reader->ctx->tbid->uid = blockIdx->uid; + + TARRAY2_CLEAR(reader->dataBlkArray, NULL); + + code = tRealloc(&reader->config->bufArr[0], blockIdx->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], blockIdx->offset, reader->config->bufArr[0], blockIdx->size); + TSDB_CHECK_CODE(code, lino, _exit); + + int32_t size = blockIdx->size / sizeof(SDataBlk); + for (int32_t i = 0; i < size; ++i) { + code = TARRAY2_APPEND_PTR(reader->dataBlkArray, ((SDataBlk *)reader->config->bufArr[0]) + i); + TSDB_CHECK_CODE(code, lino, _exit); + } + + dataBlkArray[0] = reader->dataBlkArray; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; } int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData) { - // TODO - return 0; + int32_t code = 0; + int32_t lino = 0; + + code = tRealloc(&reader->config->bufArr[0], dataBlk->aSubBlock->szBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], dataBlk->aSubBlock->offset, reader->config->bufArr[0], + dataBlk->aSubBlock->szBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tDecmprBlockData(reader->config->bufArr[0], dataBlk->aSubBlock->szBlock, bData, &reader->config->bufArr[1]); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; } // SDataFileWriter ============================================= @@ -104,11 +235,10 @@ struct SDataFileWriter { int32_t dataBlkArrayIdx; SBlockData bData[1]; int32_t iRow; - - TABLEID tbid[1]; + TABLEID tbid[1]; } ctx[1]; - STFile file[TSDB_FTYPE_MAX]; + STFile files[TSDB_FTYPE_MAX]; STsdbFD *fd[TSDB_FTYPE_MAX]; SDataFooter footer[1]; @@ -123,14 +253,14 @@ static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - writer->footer->blockIdxPtr->offset = writer->file[TSDB_FTYPE_HEAD].size; + writer->footer->blockIdxPtr->offset = writer->files[TSDB_FTYPE_HEAD].size; writer->footer->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray); if (writer->footer->blockIdxPtr->size) { code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->footer->blockIdxPtr->offset, (void *)TARRAY2_DATA(writer->blockIdxArray), writer->footer->blockIdxPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file[TSDB_FTYPE_HEAD].size += writer->footer->blockIdxPtr->size; + writer->files[TSDB_FTYPE_HEAD].size += writer->footer->blockIdxPtr->size; } _exit: @@ -150,6 +280,39 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { return 0; } +static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + if (writer->config->files[i].exist) { + SDataFileReaderConfig config[1] = {{ + .tsdb = writer->config->tsdb, + .szPage = writer->config->szPage, + }}; + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + config->files[i].exist = writer->config->files[i].exist; + config->files[i].file = writer->config->files[i].file; + } + + code = tsdbDataFileReaderOpen(NULL, config, &writer->ctx->reader); + TSDB_CHECK_CODE(code, lino, _exit); + + break; + } + } + + code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; @@ -159,31 +322,48 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr; // open reader - if (writer->config->hasOldFile) { - // TODO + code = tsdbDataFileWriterDoOpenReader(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + // .head + writer->files[TSDB_FTYPE_HEAD] = (STFile){ + .type = TSDB_FTYPE_HEAD, + .did = writer->config->did, + .fid = writer->config->fid, + .cid = writer->config->cid, + .size = 0, + }; + + // .data + if (writer->config->files[TSDB_FTYPE_DATA].exist) { + writer->files[TSDB_FTYPE_DATA] = writer->config->files[TSDB_FTYPE_DATA].file; + } else { + writer->files[TSDB_FTYPE_DATA] = writer->files[TSDB_FTYPE_HEAD]; + writer->files[TSDB_FTYPE_DATA].type = TSDB_FTYPE_DATA; } - // open writer - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - char fname[TSDB_FILENAME_LEN]; - - tsdbTFileName(writer->config->tsdb, writer->file + i, fname); - int32_t flag = TD_FILE_WRITE; // TODO - - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[i]); - TSDB_CHECK_CODE(code, lino, _exit); - - // writer header - if (0) { - // TODO - } + // .sma + if (writer->config->files[TSDB_FTYPE_SMA].exist) { + writer->files[TSDB_FTYPE_SMA] = writer->config->files[TSDB_FTYPE_SMA].file; + } else { + writer->files[TSDB_FTYPE_SMA] = writer->files[TSDB_FTYPE_HEAD]; + writer->files[TSDB_FTYPE_SMA].type = TSDB_FTYPE_SMA; } - writer->ctx->opened = true; + // .tomb (todo) + writer->files[TSDB_FTYPE_TOMB] = (STFile){ + .type = TSDB_FTYPE_TOMB, + .did = writer->config->did, + .fid = writer->config->fid, + .cid = writer->config->cid, + .size = 0, + }; _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } else { + writer->ctx->opened = true; } return code; } @@ -228,16 +408,16 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); TSDB_CHECK_CODE(code, lino, _exit); - dataBlk->aSubBlock->offset = writer->file[TSDB_FTYPE_DATA].size; + dataBlk->aSubBlock->offset = writer->files[TSDB_FTYPE_DATA].size; dataBlk->aSubBlock->szKey = sizeArr[3] + sizeArr[2]; dataBlk->aSubBlock->szBlock = dataBlk->aSubBlock->szKey + sizeArr[1] + sizeArr[0]; for (int32_t i = 3; i >= 0; --i) { if (sizeArr[i]) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->file[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], sizeArr[i]); TSDB_CHECK_CODE(code, lino, _exit); - writer->file[TSDB_FTYPE_DATA].size += sizeArr[i]; + writer->files[TSDB_FTYPE_DATA].size += sizeArr[i]; } } @@ -260,14 +440,14 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b TSDB_CHECK_CODE(code, lino, _exit); } - dataBlk->smaInfo.offset = writer->file[TSDB_FTYPE_SMA].size; + dataBlk->smaInfo.offset = writer->files[TSDB_FTYPE_SMA].size; dataBlk->smaInfo.size = TARRAY2_DATA_LEN(smaArr); if (dataBlk->smaInfo.size) { code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], dataBlk->smaInfo.offset, (const uint8_t *)TARRAY2_DATA(smaArr), dataBlk->smaInfo.size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file[TSDB_FTYPE_SMA].size += dataBlk->smaInfo.size; + writer->files[TSDB_FTYPE_SMA].size += dataBlk->smaInfo.size; } TARRAY2_FREE(smaArr); @@ -276,9 +456,7 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); TSDB_CHECK_CODE(code, lino, _exit); - if (bData == writer->bData) { - tBlockDataClear(writer->bData); - } + tBlockDataClear(writer->bData); _exit: if (code) { @@ -296,13 +474,13 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA SBlockIdx blockIdx[1]; blockIdx->suid = writer->ctx->tbid->suid; blockIdx->uid = writer->ctx->tbid->uid; - blockIdx->offset = writer->file[TSDB_FTYPE_HEAD].size; + blockIdx->offset = writer->files[TSDB_FTYPE_HEAD].size; blockIdx->size = TARRAY2_DATA_LEN(dataBlkArray); code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray), blockIdx->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file[TSDB_FTYPE_HEAD].size += blockIdx->size; + writer->files[TSDB_FTYPE_HEAD].size += blockIdx->size; code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); TSDB_CHECK_CODE(code, lino, _exit); @@ -351,7 +529,6 @@ _exit: static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); while (writer->ctx->tbHasOldData) { for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { @@ -406,7 +583,7 @@ _do_write: _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -520,10 +697,10 @@ static int32_t tsdbDataFileWriteFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->file[TSDB_FTYPE_HEAD].size, (const uint8_t *)writer->footer, - sizeof(SDataFooter)); + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, + (const uint8_t *)writer->footer, sizeof(SDataFooter)); TSDB_CHECK_CODE(code, lino, _exit); - writer->file[TSDB_FTYPE_HEAD].size += sizeof(SDataFooter); + writer->files[TSDB_FTYPE_HEAD].size += sizeof(SDataFooter); _exit: if (code) { @@ -549,22 +726,71 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, STFileOp * code = tsdbDataFileWriteFooter(writer); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - if (writer->fd[i]) { - code = tsdbFsyncFile(writer->fd[i]); - TSDB_CHECK_CODE(code, lino, _exit); - - tsdbCloseFile(&writer->fd[i]); - } - } - // .head + int32_t ftype = TSDB_FTYPE_HEAD; + op[ftype] = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->fid, + .nf = writer->files[ftype], + }; // .data + ftype = TSDB_FTYPE_DATA; + if (writer->fd[ftype]) { + if (!writer->config->files[ftype].exist) { + op[ftype] = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->fid, + .nf = writer->files[ftype], + }; + } else if (writer->config->files[ftype].file.size == writer->files[ftype].size) { + op[ftype].optype = TSDB_FOP_NONE; + } else { + op[ftype] = (STFileOp){ + .optype = TSDB_FOP_MODIFY, + .fid = writer->config->fid, + .of = writer->config->files[ftype].file, + .nf = writer->files[ftype], + }; + } + } else { + op[ftype].optype = TSDB_FOP_NONE; + } // .sma + ftype = TSDB_FTYPE_SMA; + if (writer->fd[ftype]) { + if (!writer->config->files[ftype].exist) { + op[ftype] = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->fid, + .nf = writer->files[ftype], + }; + } else if (writer->config->files[ftype].file.size == writer->files[ftype].size) { + op[ftype].optype = TSDB_FOP_NONE; + } else { + op[ftype] = (STFileOp){ + .optype = TSDB_FOP_MODIFY, + .fid = writer->config->fid, + .of = writer->config->files[ftype].file, + .nf = writer->files[ftype], + }; + } + } else { + op[ftype].optype = TSDB_FOP_NONE; + } // .tomb + op[TSDB_FTYPE_TOMB] = (STFileOp){ + .optype = TSDB_FOP_NONE, + }; + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + if (!writer->fd[i]) continue; + code = tsdbFsyncFile(writer->fd[i]); + TSDB_CHECK_CODE(code, lino, _exit); + tsdbCloseFile(&writer->fd[i]); + } _exit: if (code) { @@ -578,7 +804,6 @@ int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWri if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; writer[0]->config[0] = config[0]; - writer[0]->ctx->opened = false; return 0; } @@ -610,6 +835,39 @@ _exit: return code; } +static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + for (int32_t i = 0; i < TSDB_FTYPE_TOMB /* TODO */; ++i) { + char fname[TSDB_FILENAME_LEN]; + int32_t flag = TD_FILE_READ | TD_FILE_WRITE; + + if (writer->files[i].size == 0) { + flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); + } + + tsdbTFileName(writer->config->tsdb, &writer->files[i], fname); + code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[i]); + TSDB_CHECK_CODE(code, lino, _exit); + + if (writer->files[i].size == 0) { + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + + code = tsdbWriteFile(writer->fd[i], 0, hdr, TSDB_FHDR_SIZE); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->files[i].size += TSDB_FHDR_SIZE; + } + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; @@ -619,6 +877,12 @@ int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } + // open FD + if (!writer->fd[TSDB_FTYPE_DATA]) { + code = tsdbDataFileWriterOpenDataFD(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } + if (row->uid != writer->ctx->tbid->uid) { code = tsdbDataFileWriteTableDataEnd(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -643,23 +907,38 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) int32_t code = 0; int32_t lino = 0; - // ASSERT(bData->uid); + ASSERT(bData->uid); - // if (!writer->ctx->opened) { - // code = tsdbDataFileWriterDoOpen(writer); - // TSDB_CHECK_CODE(code, lino, _exit); - // } + if (!writer->ctx->opened) { + code = tsdbDataFileWriterDoOpen(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - // if (bData->uid != writer->ctx->tbid->uid) { - // code = tsdbDataFileWriteTableDataEnd(writer); - // TSDB_CHECK_CODE(code, lino, _exit); + if (!writer->fd[TSDB_FTYPE_DATA]) { + code = tsdbDataFileWriterOpenDataFD(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - // code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData); - // TSDB_CHECK_CODE(code, lino, _exit); - // } + if (bData->uid != writer->ctx->tbid->uid) { + code = tsdbDataFileWriteTableDataEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); - // code = tsdbDataFileDoWriteTableDataBlock(writer, bData); - // TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!writer->ctx->tbHasOldData // + && writer->bData->nRow == 0 // + ) { + code = tsdbDataFileWriteDataBlock(writer, bData); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + for (int32_t i = 0; i < bData->nRow; ++i) { + TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)}; + code = tsdbDataFileDoWriteTSData(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + } + } _exit: if (code) { @@ -668,8 +947,11 @@ _exit: return code; } -int32_t tsdbDataFileFLushTSDataBlock(SDataFileWriter *writer) { - // if (writer->bData->nRow == 0) return 0; - // return tsdbDataFileDoWriteTableDataBlock(writer, writer->bData); - return 0; +int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer) { + ASSERT(writer->ctx->opened); + + if (writer->bData->nRow == 0) return 0; + if (writer->ctx->tbHasOldData) return 0; + + return tsdbDataFileWriteDataBlock(writer, writer->bData); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 3a32a0ff48..a9910ee0b4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -127,7 +127,7 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbDataFileFLushTSDataBlock(merger->dataWriter); + code = tsdbDataFileFlushTSDataBlock(merger->dataWriter); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) { @@ -406,54 +406,35 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { } if (merger->ctx->toData) { - // TODO - SDiskID did[1]; + SDiskID did; int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now); - if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) { + + if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) { code = TSDB_CODE_FS_NO_VALID_DISK; TSDB_CHECK_CODE(code, lino, _exit); } - SDataFileWriterConfig config = { + SDataFileWriterConfig config[1] = {{ .tsdb = merger->tsdb, + .cmprAlg = merger->cmprAlg, .maxRow = merger->maxRow, - .of = - { - [0] = - { - .type = TSDB_FTYPE_HEAD, - .did = did[0], - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .size = 0, - }, - [1] = - { - .type = TSDB_FTYPE_DATA, - .did = did[0], - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .size = 0, - }, - [2] = - { - .type = TSDB_FTYPE_SMA, - .did = did[0], - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .size = 0, - }, - [3] = - { - .type = TSDB_FTYPE_TOMB, - .did = did[0], - .fid = merger->ctx->fset->fid, - .cid = merger->cid, - .size = 0, - }, - }, - }; - code = tsdbDataFileWriterOpen(&config, &merger->dataWriter); + .szPage = merger->szPage, + .fid = merger->ctx->fset->fid, + .cid = merger->cid, + .did = did, + .compactVersion = merger->compactVersion, + }}; + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) { + if (merger->ctx->fset->farr[i]) { + config->files[i].exist = true; + config->files[i].file = merger->ctx->fset->farr[i]->f[0]; + } else { + config->files[i].exist = false; + } + } + + code = tsdbDataFileWriterOpen(config, &merger->dataWriter); TSDB_CHECK_CODE(code, lino, _exit); }