From 88907b1996e0ab897f75a6c38727001c37c90380 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 30 Mar 2023 17:05:58 +0800 Subject: [PATCH] more code --- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c | 64 +++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index 076579e7f0..2b981c8dff 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -104,6 +104,21 @@ _exit: return code; } +static int32_t write_statistics_block(struct SSttFWriter *pWriter) { + int32_t code = 0; + + tTbStatisBlockClear(&pWriter->sData); + // TODO + + return code; +} + +static int32_t write_delete_block(struct SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + static int32_t write_stt_blk(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; @@ -217,14 +232,19 @@ static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr)); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { + if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, tstrerror(code)); } @@ -248,8 +268,8 @@ int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWrit _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code)); if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -287,12 +307,28 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW int32_t code = 0; int32_t lino; + TSDBKEY key = TSDBROW_KEY(pRow); + if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { if (pWriter->bData.nRow > 0) { code = write_timeseries_block(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } + if (pWriter->sData.nRow >= pWriter->config.maxRow) { + code = write_statistics_block(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid + pWriter->sData.aData[1][pWriter->sData.nRow] = tbid->uid; // uid + pWriter->sData.aData[2][pWriter->sData.nRow] = key.ts; // skey + pWriter->sData.aData[3][pWriter->sData.nRow] = key.version; // sver + pWriter->sData.aData[4][pWriter->sData.nRow] = key.ts; // ekey + pWriter->sData.aData[5][pWriter->sData.nRow] = key.version; // ever + pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count + pWriter->sData.nRow++; + code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb); TSDB_CHECK_CODE(code, lino, _exit); @@ -314,6 +350,17 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW TSDB_CHECK_CODE(code, lino, _exit); } + if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { + pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey + pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever + pWriter->sData.aData[6][pWriter->sData.nRow - 1]++; // count + } else if (key.ts == pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { + pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey + pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever + } else { + ASSERTS(0, "timestamp should be in ascending order"); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, @@ -323,7 +370,16 @@ _exit: } int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { - int32_t code = 0; - // TODO - return code; + pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid + pWriter->dData.aData[1][pWriter->dData.nRow] = tbid->uid; // uid + pWriter->dData.aData[2][pWriter->dData.nRow] = pDelData->version; // version + pWriter->dData.aData[3][pWriter->dData.nRow] = pDelData->sKey; // skey + pWriter->dData.aData[4][pWriter->dData.nRow] = pDelData->eKey; // ekey + pWriter->dData.nRow++; + + if (pWriter->dData.nRow >= pWriter->config.maxRow) { + return write_delete_block(pWriter); + } else { + return 0; + } } \ No newline at end of file