diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 45c3de2347..c2fc55bc87 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -86,9 +86,9 @@ typedef struct SDataFileWriterConfig { int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, TFileOpArray *opArr); -int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row); -int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); -int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer); +int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row); +int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData); +int32_t tsdbDataFileFlush(SDataFileWriter *writer); int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index dc16b5803a..cbc06042f0 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -672,259 +672,6 @@ _exit: return code; } -static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *bData) { - if (bData->nRow == 0) return 0; - - ASSERT(bData->uid); - - int32_t code = 0; - int32_t lino = 0; - - SDataBlk dataBlk[1] = {{ - .minKey = - { - .ts = bData->aTSKEY[0], - .version = bData->aVersion[0], - }, - .maxKey = - { - .ts = bData->aTSKEY[bData->nRow - 1], - .version = bData->aVersion[bData->nRow - 1], - }, - .minVer = bData->aVersion[0], - .maxVer = bData->aVersion[0], - .nRow = bData->nRow, - .hasDup = 0, - .nSubBlock = 1, - }}; - - for (int32_t i = 1; i < bData->nRow; ++i) { - if (bData->aTSKEY[i] == bData->aTSKEY[i - 1]) { - dataBlk->hasDup = 1; - } - dataBlk->minVer = TMIN(dataBlk->minVer, bData->aVersion[i]); - dataBlk->maxVer = TMAX(dataBlk->maxVer, bData->aVersion[i]); - } - - int32_t sizeArr[5] = {0}; - - // to .data - code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); - TSDB_CHECK_CODE(code, lino, _exit); - - dataBlk->aSubBlock->offset = writer->files[TSDB_FTYPE_DATA].size; - dataBlk->aSubBlock->szKey = sizeArr[3] + sizeArr[2]; - dataBlk->aSubBlock->szBlock = dataBlk->aSubBlock->szKey + sizeArr[1] + sizeArr[0]; - - for (int32_t i = 3; i >= 0; --i) { - if (sizeArr[i]) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], - sizeArr[i]); - TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_DATA].size += sizeArr[i]; - } - } - - // to .sma - TColumnDataAggArray smaArr[1] = {0}; - - for (int32_t i = 0; i < bData->nColData; ++i) { - SColData *colData = bData->aColData + i; - - if ((!colData->smaOn) // - || ((colData->flag & HAS_VALUE) == 0) // - ) { - continue; - } - - SColumnDataAgg sma[1] = {{.colId = colData->cid}}; - tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); - - code = TARRAY2_APPEND_PTR(smaArr, sma); - TSDB_CHECK_CODE(code, lino, _exit); - } - - dataBlk->smaInfo.offset = writer->files[TSDB_FTYPE_SMA].size; - dataBlk->smaInfo.size = TARRAY2_DATA_LEN(smaArr); - - if (dataBlk->smaInfo.size) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], dataBlk->smaInfo.offset, (const uint8_t *)TARRAY2_DATA(smaArr), - dataBlk->smaInfo.size); - TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_SMA].size += dataBlk->smaInfo.size; - } - - TARRAY2_DESTROY(smaArr, NULL); - -#if 0 - // to dataBlkArray - code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); - TSDB_CHECK_CODE(code, lino, _exit); -#endif - - tBlockDataClear(bData); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkArray *dataBlkArray) { - if (TARRAY2_SIZE(dataBlkArray) == 0) return 0; - - int32_t code = 0; - int32_t lino = 0; - - int32_t ftype = TSDB_FTYPE_HEAD; - SBlockIdx blockIdx[1] = {{ - .suid = writer->ctx->tbid->suid, - .uid = writer->ctx->tbid->uid, - .offset = writer->files[ftype].size, - .size = TARRAY2_DATA_LEN(dataBlkArray), - }}; - - code = - tsdbWriteFile(writer->fd[ftype], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray), blockIdx->size); - TSDB_CHECK_CODE(code, lino, _exit); - writer->files[ftype].size += blockIdx->size; - -#if 0 - code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); -#endif - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { - int32_t code = 0; - int32_t lino = 0; - - // update/append - if (row->type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow); - TSDB_CHECK_CODE(code, lino, _exit); - } - - TSDBKEY key[1] = {TSDBROW_KEY(row)}; - if (key->version <= writer->config->compactVersion // - && writer->blockData->nRow > 0 // - && writer->blockData->aTSKEY[writer->blockData->nRow - 1] == key->ts // - ) { - code = tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (writer->blockData->nRow >= writer->config->maxRow) { - code = tsdbDataFileWriteDataBlock(writer, writer->blockData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { - int32_t code = 0; - int32_t lino = 0; - - while (writer->ctx->tbHasOldData) { - for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { - TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx)}; - - int32_t c = tsdbRowCmprFn(row, row1); - ASSERT(c); - if (c > 0) { - code = tsdbDataFileDoWriteTSRow(writer, row1); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - goto _do_write; - } - } - -#if 0 - if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) { - writer->ctx->tbHasOldData = false; - break; - } - - for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) { - const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx); - - TSDBKEY key = TSDBROW_KEY(row); - SDataBlk dataBlk1[1] = {{ - .minKey = key, - .maxKey = key, - }}; - - int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1); - if (c < 0) { - code = tsdbDataFileWriteDataBlock(writer, writer->blockData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - } else if (c > 0) { - goto _do_write; - } else { - code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->blockData); - TSDB_CHECK_CODE(code, lino, _exit); - - writer->ctx->blockDataIdx = 0; - writer->ctx->dataBlkArrayIdx++; - break; - } - } -#endif - } - -_do_write: - code = tsdbDataFileDoWriteTSRow(writer, row); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { - if (writer->ctx->tbid->uid == 0) return 0; - - int32_t code = 0; - int32_t lino = 0; - - if (writer->ctx->tbHasOldData) { - for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { - TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); - code = tsdbDataFileDoWriteTSRow(writer, &row); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tsdbDataFileWriteDataBlock(writer, writer->blockData); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); - } - return code; -} - static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; @@ -1027,6 +774,308 @@ _exit: return code; } +static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { + if (bData->nRow == 0) return 0; + + ASSERT(bData->uid); + + int32_t code = 0; + int32_t lino = 0; + + SBrinRecord record[1] = {{ + .suid = bData->suid, + .uid = bData->uid, + .firstKey = bData->aTSKEY[0], + .firstKeyVer = bData->aVersion[0], + .lastKey = bData->aTSKEY[bData->nRow - 1], + .lastKeyVer = bData->aVersion[bData->nRow - 1], + .minVer = bData->aVersion[0], + .maxVer = bData->aVersion[0], + .blockOffset = writer->files[TSDB_FTYPE_DATA].size, + .smaOffset = writer->files[TSDB_FTYPE_SMA].size, + .blockSize = 0, + .blockKeySize = 0, + .smaSize = 0, + .numRow = bData->nRow, + .count = 1, + }}; + + for (int32_t i = 1; i < bData->nRow; ++i) { + if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) { + record->count++; + } + if (bData->aVersion[i] < record->minVer) { + record->minVer = bData->aVersion[i]; + } + if (bData->aVersion[i] > record->maxVer) { + record->maxVer = bData->aVersion[i]; + } + } + + // to .data file + int32_t sizeArr[5] = {0}; + + code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); + TSDB_CHECK_CODE(code, lino, _exit); + + record->blockKeySize = sizeArr[3] + sizeArr[2]; + record->blockSize = sizeArr[0] + sizeArr[1] + record->blockKeySize; + + for (int32_t i = 3; i >= 0; --i) { + if (sizeArr[i]) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], + sizeArr[i]); + TSDB_CHECK_CODE(code, lino, _exit); + writer->files[TSDB_FTYPE_DATA].size += sizeArr[i]; + } + } + + // to .sma file + TColumnDataAggArray smaArr[1]; + + TARRAY2_INIT(smaArr); + + for (int32_t i = 0; i < bData->nColData; ++i) { + SColData *colData = bData->aColData + i; + + if ((!colData->smaOn) // + || ((colData->flag & HAS_VALUE) == 0) // + ) { + continue; + } + + SColumnDataAgg sma[1] = {{.colId = colData->cid}}; + tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); + + code = TARRAY2_APPEND_PTR(smaArr, sma); + TSDB_CHECK_CODE(code, lino, _exit); + } + + record->smaSize = TARRAY2_DATA_LEN(smaArr); + + if (record->smaSize > 0) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, (const uint8_t *)TARRAY2_DATA(smaArr), + record->smaSize); + TSDB_CHECK_CODE(code, lino, _exit); + writer->files[TSDB_FTYPE_SMA].size += record->smaSize; + } + + TARRAY2_DESTROY(smaArr, NULL); + + // append SBrinRecord + code = tsdbDataFileWriteBrinRecord(writer, record); + TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataClear(bData); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkArray *dataBlkArray) { + if (TARRAY2_SIZE(dataBlkArray) == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + int32_t ftype = TSDB_FTYPE_HEAD; + SBlockIdx blockIdx[1] = {{ + .suid = writer->ctx->tbid->suid, + .uid = writer->ctx->tbid->uid, + .offset = writer->files[ftype].size, + .size = TARRAY2_DATA_LEN(dataBlkArray), + }}; + + code = + tsdbWriteFile(writer->fd[ftype], blockIdx->offset, (const uint8_t *)TARRAY2_DATA(dataBlkArray), blockIdx->size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->files[ftype].size += blockIdx->size; + +#if 0 + code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx); + TSDB_CHECK_CODE(code, lino, _exit); +#endif + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) { + int32_t code = 0; + int32_t lino = 0; + + // update/append + if (row->type == TSDBROW_ROW_FMT) { + code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(row), writer->config->skmRow); + TSDB_CHECK_CODE(code, lino, _exit); + } + + TSDBKEY key[1]; + if (row->type == TSDBROW_ROW_FMT) { + key->ts = row->pTSRow->ts; + key->version = row->version; + } else { + key->ts = row->pBlockData->aTSKEY[row->iRow]; + key->version = row->pBlockData->aVersion[row->iRow]; + } + if (key->version <= writer->config->compactVersion // + && writer->blockData->nRow > 0 // + && writer->blockData->aTSKEY[writer->blockData->nRow - 1] == key->ts // + ) { + code = tBlockDataUpdateRow(writer->blockData, row, writer->config->skmRow->pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (writer->blockData->nRow >= writer->config->maxRow) { + code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(writer->blockData, row, writer->config->skmRow->pTSchema, writer->ctx->tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) { + int32_t code = 0; + int32_t lino = 0; + + if (writer->ctx->tbHasOldData) { + TSDBKEY key[1]; + if (row->type == TSDBROW_ROW_FMT) { + key->ts = row->pTSRow->ts; + key->version = row->version; + } else { + key->ts = row->pBlockData->aTSKEY[row->iRow]; + key->version = row->pBlockData->aVersion[row->iRow]; + } + + for (;;) { + for (;;) { + // SBlockData + for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { + if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] // + || (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] && + key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) { + goto _do_write; + } else { + TSDBROW row1 = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + code = tsdbDataFileDoWriteTSRow(writer, &row1); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + // SBrinBlock + if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) { + break; + } + + for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { + if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) { + writer->ctx->tbHasOldData = false; + goto _do_write; + } + + if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) // + || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) && + key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) { + goto _do_write; + } else { + SBrinRecord record[1]; + tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); + if (key->ts > TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) // + || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->lastKey, writer->ctx->brinBlockIdx) && + key->version > TARRAY2_GET(writer->ctx->brinBlock->lastKeyVer, writer->ctx->brinBlockIdx))) { + if (writer->blockData->nRow > 0) { + code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDataFileWriteBrinRecord(writer, record); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileReadBlockData(writer->ctx->reader, record, writer->ctx->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->blockDataIdx = 0; + writer->ctx->brinBlockIdx++; + break; + } + } + } + } + + // SBrinBlk + if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { + writer->ctx->brinBlkArray = NULL; + writer->ctx->tbHasOldData = false; + goto _do_write; + } + + for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { + const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); + + if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) { + writer->ctx->tbHasOldData = false; + goto _do_write; + } + + code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->brinBlockIdx = 0; + writer->ctx->brinBlkArrayIdx++; + break; + } + } + } + +_do_write: + code = tsdbDataFileDoWriteTSRow(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { + if (writer->ctx->tbid->uid == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + if (writer->ctx->tbHasOldData) { + for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { + TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + code = tsdbDataFileDoWriteTSRow(writer, &row); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TABLEID *tbid) { int32_t code = 0; int32_t lino = 0; @@ -1498,7 +1547,7 @@ _exit: return code; } -int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SRowInfo *row) { +int32_t tsdbDataFileWriteRow(SDataFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; @@ -1530,7 +1579,7 @@ _exit: return code; } -int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) { +int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { if (bData->nRow == 0) return 0; int32_t code = 0; @@ -1543,7 +1592,7 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) TSDB_CHECK_CODE(code, lino, _exit); } - if (!writer->fd[TSDB_FTYPE_DATA]) { + if (writer->fd[TSDB_FTYPE_DATA] == NULL) { code = tsdbDataFileWriterOpenDataFD(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1559,7 +1608,7 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) if (!writer->ctx->tbHasOldData // && writer->blockData->nRow == 0 // ) { - code = tsdbDataFileWriteDataBlock(writer, bData); + code = tsdbDataFileDoWriteBlockData(writer, bData); TSDB_CHECK_CODE(code, lino, _exit); } else { for (int32_t i = 0; i < bData->nRow; ++i) { @@ -1576,13 +1625,13 @@ _exit: return code; } -int32_t tsdbDataFileFlushTSDataBlock(SDataFileWriter *writer) { +int32_t tsdbDataFileFlush(SDataFileWriter *writer) { ASSERT(writer->ctx->opened); if (writer->blockData->nRow == 0) return 0; if (writer->ctx->tbHasOldData) return 0; - return tsdbDataFileWriteDataBlock(writer, writer->blockData); + return tsdbDataFileDoWriteBlockData(writer, writer->blockData); } static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index ea732537ab..085e5cf46f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -130,35 +130,35 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { for (int32_t i = 0; i < numRow; i++) { row->row.iRow = i; - code = tsdbDataFileWriteTSData(merger->dataWriter, row); + code = tsdbDataFileWriteRow(merger->dataWriter, row); TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbDataFileFlushTSDataBlock(merger->dataWriter); + code = tsdbDataFileFlush(merger->dataWriter); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) { row->row.iRow = i; - code = tsdbDataFileWriteTSData(merger->dataWriter, row); + code = tsdbDataFileWriteRow(merger->dataWriter, row); TSDB_CHECK_CODE(code, lino, _exit); } row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0); for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) { row->row.iRow = i; - code = tsdbDataFileWriteTSData(merger->dataWriter, row); + code = tsdbDataFileWriteRow(merger->dataWriter, row); TSDB_CHECK_CODE(code, lino, _exit); } } else { if (merger->ctx->bData[pidx].nRow > 0) { - code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx); + code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx); TSDB_CHECK_CODE(code, lino, _exit); } if (merger->ctx->bData[cidx].nRow < merger->minRow) { code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx); TSDB_CHECK_CODE(code, lino, _exit); } else { - code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx); + code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + cidx); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -224,7 +224,7 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) { if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) { int32_t idx = (merger->ctx->bDataIdx + 1) % 2; - code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + idx); + code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->bData + idx); TSDB_CHECK_CODE(code, lino, _exit); tBlockDataClear(merger->ctx->bData + idx);