From da31d92b7e861e93ab533102ae236acb09f24c6d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 2 Jun 2023 14:39:54 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/inc/tsdbDataFileRW.h | 2 + .../dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c | 610 ++++++++++-------- 2 files changed, 352 insertions(+), 260 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index b3542ac642..001d749e25 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -26,6 +26,7 @@ extern "C" { typedef TARRAY2(SBlockIdx) TBlockIdxArray; typedef TARRAY2(SDataBlk) TDataBlkArray; +typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray; // SDataFileReader ============================================= typedef struct SDataFileReader SDataFileReader; @@ -50,6 +51,7 @@ typedef struct SDataFileWriterConfig { STsdb *tsdb; bool hasOldFile; STFile of[TSDB_FTYPE_MAX]; + int8_t cmprAlg; int32_t maxRow; int32_t szPage; int64_t cid; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index e8275a266d..c237b01ac2 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -17,7 +17,6 @@ typedef struct { SFDataPtr blockIdxPtr[1]; - SFDataPtr dataBlkPtr[1]; SFDataPtr rsrvd[2]; } SDataFooter; @@ -101,11 +100,12 @@ struct SDataFileWriter { const TBlockIdxArray *blockIdxArray; int32_t blockIdxArrayIdx; bool tbHasOldData; - TABLEID tbid[1]; const TDataBlkArray *dataBlkArray; int32_t dataBlkArrayIdx; SBlockData bData[1]; int32_t iRow; + + TABLEID tbid[1]; } ctx[1]; STFile file[TSDB_FTYPE_MAX]; @@ -119,68 +119,29 @@ struct SDataFileWriter { STbStatisBlock sData[1]; }; -int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) { - writer[0] = taosMemoryCalloc(1, sizeof(SDataFileWriter)); - if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; - - writer[0]->config[0] = config[0]; - writer[0]->ctx->opened = false; - return 0; -} - -static int32_t tsdbDataFileWriteRemainData(SDataFileWriter *writer) { - // TODO - return 0; -} - static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); - int64_t offset = writer->file[TSDB_FTYPE_HEAD].size; - int64_t size = TARRAY2_DATA_LEN(writer->dataBlkArray); - if (TARRAY2_SIZE(writer->blockIdxArray) > 0) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], offset, (void *)TARRAY2_DATA(writer->blockIdxArray), size); + writer->footer->blockIdxPtr->offset = writer->file[TSDB_FTYPE_HEAD].size; + writer->footer->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray); + + if (writer->footer->blockIdxPtr->size) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->footer->blockIdxPtr->offset, + (void *)TARRAY2_DATA(writer->blockIdxArray), writer->footer->blockIdxPtr->size); TSDB_CHECK_CODE(code, lino, _exit); + writer->file[TSDB_FTYPE_HEAD].size += writer->footer->blockIdxPtr->size; } _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; -} - -static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); - - code = tsdbDataFileWriteRemainData(writer); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFileWriteBlockIdx(writer); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - if (writer->fd[i]) { - code = tsdbFsyncFile(writer->fd[i]); - TSDB_CHECK_CODE(code, lino, _exit); - - tsdbCloseFile(&writer->fd[i]); - } - } - -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) { - // TODO + ASSERT(0); return 0; } @@ -189,35 +150,6 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { return 0; } -int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode); - - if (!writer[0]->ctx->opened) { - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { - op[i].optype = TSDB_FOP_NONE; - } - } else { - if (abort) { - code = tsdbDataFileWriterCloseAbort(writer[0]); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDataFileWriterCloseCommit(writer[0]); - TSDB_CHECK_CODE(code, lino, _exit); - } - tsdbDataFileWriterDoClose(writer[0]); - } - taosMemoryFree(writer); - writer[0] = NULL; - -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; -} - static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; @@ -256,27 +188,101 @@ _exit: return code; } -static int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { +static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *bData) { if (bData->nRow == 0) return 0; + ASSERT(bData->uid); + int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); - SDataBlk dataBlk[1]; + SDataBlk dataBlk[1] = {{ + .minKey = + { + .ts = bData->aTSKEY[0], + .version = bData->aVersion[0], + }, + .maxKey = + { + .ts = bData->aTSKEY[bData->nRow - 1], + .version = bData->aVersion[bData->nRow - 1], + }, + .minVer = bData->aVersion[0], + .maxVer = bData->aVersion[0], + .nRow = bData->nRow, + .hasDup = 0, + .nSubBlock = 1, + }}; - // TODO: fill dataBlk + for (int32_t i = 1; i < bData->nRow; ++i) { + if (bData->aTSKEY[i] == bData->aTSKEY[i - 1]) { + dataBlk->hasDup = 1; + } + dataBlk->minVer = TMIN(dataBlk->minVer, bData->aVersion[i]); + dataBlk->maxVer = TMAX(dataBlk->maxVer, bData->aVersion[i]); + } - // TODO: write data + int32_t sizeArr[5] = {0}; + // to .data + code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); + TSDB_CHECK_CODE(code, lino, _exit); + + dataBlk->aSubBlock->offset = writer->file[TSDB_FTYPE_DATA].size; + dataBlk->aSubBlock->szKey = sizeArr[3] + sizeArr[2]; + dataBlk->aSubBlock->szBlock = dataBlk->aSubBlock->szKey + sizeArr[1] + sizeArr[0]; + + for (int32_t i = 3; i >= 0; --i) { + if (sizeArr[i]) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->file[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], + sizeArr[i]); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file[TSDB_FTYPE_DATA].size += sizeArr[i]; + } + } + + // to .sma + TColumnDataAggArray smaArr[1] = {0}; + + for (int32_t i = 0; i < bData->nColData; ++i) { + SColData *colData = bData->aColData + i; + + if ((!colData->smaOn) // + || ((colData->flag & HAS_VALUE) == 0) // + ) { + continue; + } + + SColumnDataAgg sma[1] = {{.colId = colData->cid}}; + tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); + + code = TARRAY2_APPEND_PTR(smaArr, sma); + TSDB_CHECK_CODE(code, lino, _exit); + } + + dataBlk->smaInfo.offset = writer->file[TSDB_FTYPE_SMA].size; + dataBlk->smaInfo.size = TARRAY2_DATA_LEN(smaArr); + + if (dataBlk->smaInfo.size) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], dataBlk->smaInfo.offset, (const uint8_t *)TARRAY2_DATA(smaArr), + dataBlk->smaInfo.size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file[TSDB_FTYPE_SMA].size += dataBlk->smaInfo.size; + } + + TARRAY2_FREE(smaArr); + + // to dataBlkArray code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); TSDB_CHECK_CODE(code, lino, _exit); - tBlockDataClear(bData); + if (bData == writer->bData) { + tBlockDataClear(writer->bData); + } _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -286,7 +292,6 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); SBlockIdx blockIdx[1]; blockIdx->suid = writer->ctx->tbid->suid; @@ -304,7 +309,7 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -312,20 +317,93 @@ _exit: static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); + // update/append if (row->type == TSDBROW_ROW_FMT) { - // TODO: udpate row schema - } - - code = tBlockDataAppendRow(writer->ctx->bData, row, NULL /* TODO */, writer->ctx->tbid->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (writer->ctx->bData->nRow >= writer->config->maxRow) { - code = tsdbDataFileWriteBlockData(writer, writer->bData); + code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow); TSDB_CHECK_CODE(code, lino, _exit); } + TSDBKEY key[1] = {TSDBROW_KEY(row)}; + if (key->version <= writer->config->compactVersion // + && writer->bData->nRow > 0 // + && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // + ) { + code = tBlockDataUpdateRow(writer->bData, row, writer->config->skmRow->pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (writer->bData->nRow >= writer->config->maxRow) { + code = tsdbDataFileWriteDataBlock(writer, writer->bData); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(writer->bData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + + while (writer->ctx->tbHasOldData) { + for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { + TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; + + int32_t c = tsdbRowCmprFn(row, row1); + ASSERT(c); + if (c > 0) { + code = tsdbDataFileDoWriteTSRow(writer, row1); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + goto _do_write; + } + } + + if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) { + writer->ctx->tbHasOldData = false; + break; + } + + for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { + const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx); + TSDBKEY key = TSDBROW_KEY(row); + SDataBlk dataBlk1[1] = {{ + .minKey = key, + .maxKey = key, + }}; + + int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1); + if (c < 0) { + code = tsdbDataFileWriteDataBlock(writer, writer->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + } else if (c > 0) { + goto _do_write; + } else { + code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->iRow = 0; + writer->ctx->dataBlkArrayIdx++; + break; + } + } + } + +_do_write: + code = tsdbDataFileDoWriteTSRow(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -339,6 +417,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; + // handle table remain data if (writer->ctx->tbHasOldData) { for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { TSDBROW row[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; @@ -347,7 +426,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbDataFileWriteBlockData(writer, writer->bData); + code = tsdbDataFileWriteDataBlock(writer, writer->bData); TSDB_CHECK_CODE(code, lino, _exit); for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { @@ -355,9 +434,11 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx)); TSDB_CHECK_CODE(code, lino, _exit); } + + writer->ctx->tbHasOldData = false; } - code = tsdbDataFileWriteBlockData(writer, writer->bData); + code = tsdbDataFileWriteDataBlock(writer, writer->bData); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFileWriteDataBlk(writer, writer->dataBlkArray); @@ -371,154 +452,61 @@ _exit: } static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); + int32_t code = 0; + int32_t lino = 0; + SMetaInfo info; writer->ctx->tbHasOldData = false; - for (; writer->ctx->blockIdxArrayIdx < TARRAY2_SIZE(writer->ctx->blockIdxArray); writer->ctx->blockIdxArrayIdx++) { - const SBlockIdx *blockIdx = TARRAY2_GET_PTR(writer->ctx->blockIdxArray, writer->ctx->blockIdxArrayIdx); + // skip data of previous table + if (writer->ctx->blockIdxArray) { + for (; writer->ctx->blockIdxArrayIdx < TARRAY2_SIZE(writer->ctx->blockIdxArray); writer->ctx->blockIdxArrayIdx++) { + const SBlockIdx *blockIdx = TARRAY2_GET_PTR(writer->ctx->blockIdxArray, writer->ctx->blockIdxArrayIdx); - int32_t c = tTABLEIDCmprFn(blockIdx, tbid); - if (c < 0) { - SMetaInfo info; - if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, blockIdx->suid, &info, NULL) == 0) { - code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); - - writer->ctx->tbid->suid = blockIdx->suid; - writer->ctx->tbid->uid = blockIdx->uid; - - code = tsdbDataFileWriteDataBlk(writer, writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - if (c == 0) { - writer->ctx->tbHasOldData = true; - code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); - TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->dataBlkArrayIdx = 0; - tBlockDataReset(writer->ctx->bData); - writer->ctx->iRow = 0; - writer->ctx->blockIdxArrayIdx++; - } - break; - } - } - - writer->ctx->tbid[0] = tbid[0]; - -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; -} - -static int32_t tsdbDataFileDoWriteTableDataRow(SDataFileWriter *writer, TSDBROW *row) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); - - while (writer->ctx->tbHasOldData) { - for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) { - TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)}; - - int32_t c = tsdbRowCmprFn(row, row1); - ASSERT(c); - if (row > 0) { - code = tsdbDataFileDoWriteTSRow(writer, row1); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - goto _write_row; - } - } - - for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { - const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx); - SDataBlk dataBlk1[1] = {{.minKey = {}, .maxKey = {}}}; // TODO - - int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1); + int32_t c = tTABLEIDCmprFn(blockIdx, tbid); if (c < 0) { - code = tsdbDataFileWriteBlockData(writer, writer->bData); - TSDB_CHECK_CODE(code, lino, _exit); + if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, blockIdx->uid, &info, NULL) == 0) { + code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); + TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - } else if (c > 0) { - goto _write_row; + writer->ctx->tbid->suid = blockIdx->suid; + writer->ctx->tbid->uid = blockIdx->uid; + + code = tsdbDataFileWriteDataBlk(writer, writer->ctx->dataBlkArray); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + continue; + } } else { - code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->bData); - TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->iRow = 0; - writer->ctx->dataBlkArrayIdx++; + if (c == 0) { + writer->ctx->tbHasOldData = true; + + code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->dataBlkArrayIdx = 0; + + tBlockDataReset(writer->ctx->bData); + writer->ctx->iRow = 0; + + writer->ctx->blockIdxArrayIdx++; + } break; } } - - if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray) // - && writer->ctx->iRow >= writer->ctx->bData->nRow) { - writer->ctx->tbHasOldData = false; - } } -_write_row: - code = tsdbDataFileDoWriteTSRow(writer, row); + // make sure state is correct + writer->ctx->tbid[0] = tbid[0]; + + if (tbid->suid == INT64_MAX && tbid->uid == INT64_MAX) goto _exit; + + TARRAY2_CLEAR(writer->dataBlkArray, NULL); + + code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb); TSDB_CHECK_CODE(code, lino, _exit); -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; -} - -static int32_t tsdbDataFileDoWriteTableDataBlock(SDataFileWriter *writer, SBlockData *bData) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); - - if (!writer->ctx->tbHasOldData && writer->bData->nRow == 0) { - code = tsdbDataFileWriteBlockData(writer, bData); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - for (int32_t i = 0; i < bData->nRow; i++) { - TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)}; - code = tsdbDataFileDoWriteTableDataRow(writer, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - -_exit: - if (code) { - TSDB_ERROR_LOG(vid, lino, code); - } - return code; -} - -int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) { - if (bData->nRow == 0) return 0; - - int32_t code = 0; - int32_t lino = 0; - - ASSERT(bData->uid); - - if (!writer->ctx->opened) { - code = tsdbDataFileWriterDoOpen(writer); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (bData->uid != writer->ctx->tbid->uid) { - code = tsdbDataFileWriteTableDataEnd(writer); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDataFileDoWriteTableDataBlock(writer, bData); + code = tBlockDataInit(writer->bData, writer->ctx->tbid, writer->config->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -528,6 +516,100 @@ _exit: return code; } +static int32_t tsdbDataFileWriteFooter(SDataFileWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->file[TSDB_FTYPE_HEAD].size, (const uint8_t *)writer->footer, + sizeof(SDataFooter)); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file[TSDB_FTYPE_HEAD].size += sizeof(SDataFooter); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, STFileOp *op) { + int32_t code = 0; + int32_t lino = 0; + TABLEID tbid[1] = {{INT64_MAX, INT64_MAX}}; + + code = tsdbDataFileWriteTableDataEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileWriteTableDataBegin(writer, tbid); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileWriteBlockIdx(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileWriteFooter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + if (writer->fd[i]) { + code = tsdbFsyncFile(writer->fd[i]); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCloseFile(&writer->fd[i]); + } + } + + // .head + + // .data + + // .sma + + // .tomb + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) { + writer[0] = taosMemoryCalloc(1, sizeof(SDataFileWriter)); + if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->config[0] = config[0]; + writer[0]->ctx->opened = false; + return 0; +} + +int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) { + int32_t code = 0; + int32_t lino = 0; + + if (!writer[0]->ctx->opened) { + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { + op[i].optype = TSDB_FOP_NONE; + } + } else { + if (abort) { + code = tsdbDataFileWriterCloseAbort(writer[0]); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileWriterCloseCommit(writer[0], op); + TSDB_CHECK_CODE(code, lino, _exit); + } + tsdbDataFileWriterDoClose(writer[0]); + } + taosMemoryFree(writer); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code); + } + return code; +} + int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; @@ -541,36 +623,43 @@ int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { code = tsdbDataFileWriteTableDataEnd(writer); TSDB_CHECK_CODE(code, lino, _exit); - writer->ctx->tbid->suid = row->uid; - writer->ctx->tbid->uid = row->uid; - code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)row); TSDB_CHECK_CODE(code, lino, _exit); } - if (row->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, // - TSDBROW_SVERSION(&row->row), writer->config->skmRow); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbDataFileDoWriteTSData(writer, &row->row); + TSDB_CHECK_CODE(code, lino, _exit); - // update/append - TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; - if (key->version <= writer->config->compactVersion // - && writer->bData->nRow > 0 // - && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // - ) { - code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (writer->bData->nRow >= writer->config->maxRow) { - code = tsdbDataFileDoWriteTableDataBlock(writer, writer->bData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid); - TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } + return code; +} + +int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) { + if (bData->nRow == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // ASSERT(bData->uid); + + // if (!writer->ctx->opened) { + // code = tsdbDataFileWriterDoOpen(writer); + // TSDB_CHECK_CODE(code, lino, _exit); + // } + + // if (bData->uid != writer->ctx->tbid->uid) { + // code = tsdbDataFileWriteTableDataEnd(writer); + // TSDB_CHECK_CODE(code, lino, _exit); + + // code = tsdbDataFileWriteTableDataBegin(writer, (TABLEID *)bData); + // TSDB_CHECK_CODE(code, lino, _exit); + // } + + // code = tsdbDataFileDoWriteTableDataBlock(writer, bData); + // TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -580,6 +669,7 @@ _exit: } int32_t tsdbDataFileFLushTSDataBlock(SDataFileWriter *writer) { - if (writer->bData->nRow == 0) return 0; - return tsdbDataFileDoWriteTableDataBlock(writer, writer->bData); -} \ No newline at end of file + // if (writer->bData->nRow == 0) return 0; + // return tsdbDataFileDoWriteTableDataBlock(writer, writer->bData); + return 0; +}