From 2f51c609f42370b18679307dbed628331145b6d0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 11 Apr 2023 15:01:42 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 29 ++- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c | 209 ++++++++++++------ 2 files changed, 164 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index fc4d8317f0..b9e064ec19 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -249,14 +249,27 @@ static int32_t end_commit_file_set(SCommitter *pCommitter) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp); - TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE( // + code = tsdbSttFWriterClose( // + &pCommitter->pWriter, // + 0, // + pFileOp), // + lino, // + _exit); _exit: if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + tsdbError( // + "vgId:%d failed at line %d since %s", // + TD_VID(pCommitter->pTsdb->pVnode), // + lino, // + tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid); + tsdbDebug( // + "vgId:%d %s done, fid:%d", // + TD_VID(pCommitter->pTsdb->pVnode), // + __func__, // + pCommitter->fid); } return code; } @@ -284,8 +297,12 @@ static int32_t commit_next_file_set(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pCommitter->pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index 43ad39adfa..f901160b8f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -60,8 +60,8 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) { } pSttBlk->suid = pBData->suid; - pSttBlk->minUid = pBData->aUid[0]; - pSttBlk->maxUid = pBData->aUid[pBData->nRow - 1]; + pSttBlk->minUid = pBData->uid ? pBData->uid : pBData->aUid[0]; + pSttBlk->maxUid = pBData->uid ? pBData->uid : pBData->aUid[pBData->nRow - 1]; pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0]; pSttBlk->minVer = pSttBlk->maxVer = pBData->aVersion[0]; pSttBlk->nRow = pBData->nRow; @@ -72,7 +72,6 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) { if (pSttBlk->maxVer < pBData->aVersion[iRow]) pSttBlk->maxVer = pBData->aVersion[iRow]; } - // compress data block TSDB_CHECK_CODE( // code = tCmprBlockData( // pBData, // @@ -178,6 +177,8 @@ static int32_t write_delete_block(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; + ASSERTS(0, "TODO: Not implemented yet"); + SDelBlk *pDelBlk = taosArrayReserve(pWriter->aDelBlk, 1); if (pDelBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -195,16 +196,16 @@ static int32_t write_delete_block(struct SSttFWriter *pWriter) { if (pDelBlk->maxVer < pWriter->sData.aData[2][iRow]) pDelBlk->maxVer = pWriter->sData.aData[2][iRow]; } - pDelBlk->dp.offset = pWriter->config.file.size; + pDelBlk->dp.offset = pWriter->tFile.size; pDelBlk->dp.size = 0; // TODO int64_t tsize = sizeof(int64_t) * pWriter->dData.nRow; for (int32_t i = 0; i < ARRAY_SIZE(pWriter->dData.aData); i++) { - code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, (const uint8_t *)pWriter->dData.aData[i], tsize); + code = tsdbWriteFile(pWriter->pFd, pWriter->tFile.size, (const uint8_t *)pWriter->dData.aData[i], tsize); TSDB_CHECK_CODE(code, lino, _exit); pDelBlk->dp.size += tsize; - pWriter->config.file.size += tsize; + pWriter->tFile.size += tsize; } tDelBlockDestroy(&pWriter->dData); @@ -223,21 +224,30 @@ static int32_t write_stt_blk(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[1].offset = pWriter->config.file.size; + pWriter->footer.dict[1].offset = pWriter->tFile.size; pWriter->footer.dict[1].size = sizeof(SSttBlk) * taosArrayGetSize(pWriter->aSttBlk); if (pWriter->footer.dict[1].size) { - code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aSttBlk), - pWriter->footer.dict[1].size); - TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE( // + code = tsdbWriteFile( // + pWriter->pFd, // + pWriter->tFile.size, // + TARRAY_DATA(pWriter->aSttBlk), // + pWriter->footer.dict[1].size), // + lino, // + _exit); - pWriter->config.file.size += pWriter->footer.dict[1].size; + pWriter->tFile.size += pWriter->footer.dict[1].size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pWriter->config.pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } return code; } @@ -246,21 +256,30 @@ static int32_t write_statistics_blk(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[2].offset = pWriter->config.file.size; + pWriter->footer.dict[2].offset = pWriter->tFile.size; pWriter->footer.dict[2].size = sizeof(STbStatisBlock) * taosArrayGetSize(pWriter->aStatisBlk); if (pWriter->footer.dict[2].size) { - code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aStatisBlk), - pWriter->footer.dict[2].size); - TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE( // + code = tsdbWriteFile( // + pWriter->pFd, // + pWriter->tFile.size, // + TARRAY_DATA(pWriter->aStatisBlk), // + pWriter->footer.dict[2].size), // + lino, // + _exit); - pWriter->config.file.size += pWriter->footer.dict[2].size; + pWriter->tFile.size += pWriter->footer.dict[2].size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pWriter->config.pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } return code; } @@ -269,29 +288,41 @@ static int32_t write_del_blk(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[3].offset = pWriter->config.file.size; + pWriter->footer.dict[3].offset = pWriter->tFile.size; pWriter->footer.dict[3].size = sizeof(SDelBlk) * taosArrayGetSize(pWriter->aDelBlk); if (pWriter->footer.dict[3].size) { - code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, TARRAY_DATA(pWriter->aDelBlk), - pWriter->footer.dict[3].size); - TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE( // + code = tsdbWriteFile( // + pWriter->pFd, // + pWriter->tFile.size, // + TARRAY_DATA(pWriter->aDelBlk), // + pWriter->footer.dict[3].size), // + lino, // + _exit); - pWriter->config.file.size += pWriter->footer.dict[3].size; + pWriter->tFile.size += pWriter->footer.dict[3].size; } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + TD_VID(pWriter->config.pTsdb->pVnode), // + __func__, // + lino, // + tstrerror(code)); } return code; } static int32_t write_file_footer(struct SSttFWriter *pWriter) { - int32_t code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, (const uint8_t *)&pWriter->footer, - sizeof(pWriter->footer)); - pWriter->config.file.size += sizeof(pWriter->footer); + int32_t code = tsdbWriteFile( // + pWriter->pFd, // + pWriter->tFile.size, // + (const uint8_t *)&pWriter->footer, // + sizeof(pWriter->footer)); + pWriter->tFile.size += sizeof(pWriter->footer); return code; } @@ -350,18 +381,19 @@ _exit: static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) { if (pWriter) { - for (int32_t i = 0; ARRAY_SIZE(pWriter->aBuf); i++) tFree(pWriter->aBuf[i]); + for (int32_t i = 0; i < ARRAY_SIZE(pWriter->aBuf); i++) { + tFree(pWriter->aBuf[i]); + } tDestroyTSchema(pWriter->skmRow.pTSchema); tDestroyTSchema(pWriter->skmTb.pTSchema); - // statistics data block - taosArrayDestroy(pWriter->aStatisBlk); + tTbStatisBlockDestroy(&pWriter->sData); - // deleted data block - taosArrayDestroy(pWriter->aDelBlk); tDelBlockDestroy(&pWriter->dData); - // time-series data block - taosArrayDestroy(pWriter->aSttBlk); tBlockDataDestroy(&pWriter->bData); + + taosArrayDestroy(pWriter->aStatisBlk); + taosArrayDestroy(pWriter->aDelBlk); + taosArrayDestroy(pWriter->aSttBlk); taosMemoryFree(pWriter); } return 0; @@ -453,45 +485,86 @@ int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct int32_t code = 0; int32_t lino; - if (ppWriter[0]->bData.nRow > 0) { - code = write_timeseries_block(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); + if (!abort) { + if (ppWriter[0]->bData.nRow > 0) { + TSDB_CHECK_CODE( // + code = write_timeseries_block(ppWriter[0]), // + lino, // + _exit); + } + + if (ppWriter[0]->sData.nRow > 0) { + TSDB_CHECK_CODE( // + code = write_statistics_block(ppWriter[0]), // + lino, // + _exit); + } + + if (ppWriter[0]->dData.nRow > 0) { + TSDB_CHECK_CODE( // + code = write_delete_block(ppWriter[0]), // + lino, // + _exit); + } + + TSDB_CHECK_CODE( // + code = write_stt_blk(ppWriter[0]), // + lino, // + _exit); + + TSDB_CHECK_CODE( // + code = write_statistics_blk(ppWriter[0]), // + lino, // + _exit); + + TSDB_CHECK_CODE( // + code = write_del_blk(ppWriter[0]), // + lino, // + _exit); + + TSDB_CHECK_CODE( // + code = write_file_footer(ppWriter[0]), // + lino, // + _exit); + + TSDB_CHECK_CODE( // + code = write_file_header(ppWriter[0]), // + lino, // + _exit); + + TSDB_CHECK_CODE( // + code = tsdbFsyncFile(ppWriter[0]->pFd), // + lino, // + _exit); + + if (op) { + op->oState = ppWriter[0]->config.file; + op->nState = ppWriter[0]->tFile; + if (op->oState.size == 0) { + op->op = TSDB_FOP_CREATE; + } else { + op->op = TSDB_FOP_EXTEND; + } + } } - if (ppWriter[0]->sData.nRow > 0) { - code = write_statistics_block(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (ppWriter[0]->dData.nRow > 0) { - code = write_delete_block(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = write_stt_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_statistics_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_del_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_file_footer(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_file_header(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = close_stt_fwriter(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); + TSDB_CHECK_CODE( // + code = close_stt_fwriter(ppWriter[0]), // + lino, // + _exit); destroy_stt_fwriter(ppWriter[0]); ppWriter[0] = NULL; _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vgId, __func__, lino, tstrerror(code)); + tsdbError( // + "vgId:%d %s failed at line %d since %s", // + vgId, // + __func__, // + lino, // + tstrerror(code)); + } else { } return code; }