diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 4e4163f135..a333ba036d 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -1,75 +1,83 @@ # vnode add_library(vnode STATIC "") +set( + VNODE_SOURCE_FILES + "src/vnd/vnodeOpen.c" + "src/vnd/vnodeBufPool.c" + "src/vnd/vnodeCfg.c" + "src/vnd/vnodeCommit.c" + "src/vnd/vnodeQuery.c" + "src/vnd/vnodeModule.c" + "src/vnd/vnodeSvr.c" + "src/vnd/vnodeSync.c" + "src/vnd/vnodeSnapshot.c" + "src/vnd/vnodeRetention.c" + + # meta + "src/meta/metaOpen.c" + "src/meta/metaIdx.c" + "src/meta/metaTable.c" + "src/meta/metaSma.c" + "src/meta/metaQuery.c" + "src/meta/metaCommit.c" + "src/meta/metaEntry.c" + "src/meta/metaSnapshot.c" + "src/meta/metaCache.c" + + # sma + "src/sma/smaEnv.c" + "src/sma/smaUtil.c" + "src/sma/smaFS.c" + "src/sma/smaOpen.c" + "src/sma/smaCommit.c" + "src/sma/smaRollup.c" + "src/sma/smaSnapshot.c" + "src/sma/smaTimeRange.c" + + # tsdb + "src/tsdb/tsdbCommit.c" + "src/tsdb/tsdbFile.c" + "src/tsdb/tsdbFS.c" + "src/tsdb/tsdbOpen.c" + "src/tsdb/tsdbMemTable.c" + "src/tsdb/tsdbRead.c" + "src/tsdb/tsdbCache.c" + "src/tsdb/tsdbWrite.c" + "src/tsdb/tsdbReaderWriter.c" + "src/tsdb/tsdbUtil.c" + "src/tsdb/tsdbSnapshot.c" + "src/tsdb/tsdbCacheRead.c" + "src/tsdb/tsdbRetention.c" + "src/tsdb/tsdbDiskData.c" + "src/tsdb/tsdbMergeTree.c" + "src/tsdb/tsdbDataIter.c" + + # tq + "src/tq/tq.c" + "src/tq/tqExec.c" + "src/tq/tqMeta.c" + "src/tq/tqRead.c" + "src/tq/tqOffset.c" + "src/tq/tqPush.c" + "src/tq/tqSink.c" + "src/tq/tqCommit.c" + "src/tq/tqSnapshot.c" + "src/tq/tqOffsetSnapshot.c" +) + +if (USE_DEV_CODE) + aux_source_directory("src/tsdb/dev" VNODE_SOURCE_DEV_FILES) + list( + APPEND + VNODE_SOURCE_FILES + ${VNODE_SOURCE_DEV_FILES} + ) +endif(USE_DEV_CODE) + target_sources( vnode PRIVATE - - # vnode - "src/vnd/vnodeOpen.c" - "src/vnd/vnodeBufPool.c" - "src/vnd/vnodeCfg.c" - "src/vnd/vnodeCommit.c" - "src/vnd/vnodeQuery.c" - "src/vnd/vnodeModule.c" - "src/vnd/vnodeSvr.c" - "src/vnd/vnodeSync.c" - "src/vnd/vnodeSnapshot.c" - "src/vnd/vnodeRetention.c" - - # meta - "src/meta/metaOpen.c" - "src/meta/metaIdx.c" - "src/meta/metaTable.c" - "src/meta/metaSma.c" - "src/meta/metaQuery.c" - "src/meta/metaCommit.c" - "src/meta/metaEntry.c" - "src/meta/metaSnapshot.c" - "src/meta/metaCache.c" - - # sma - "src/sma/smaEnv.c" - "src/sma/smaUtil.c" - "src/sma/smaFS.c" - "src/sma/smaOpen.c" - "src/sma/smaCommit.c" - "src/sma/smaRollup.c" - "src/sma/smaSnapshot.c" - "src/sma/smaTimeRange.c" - - # tsdb - "src/tsdb/tsdbCommit.c" - "src/tsdb/tsdbFile.c" - "src/tsdb/tsdbFS.c" - "src/tsdb/tsdbOpen.c" - "src/tsdb/tsdbMemTable.c" - "src/tsdb/tsdbRead.c" - "src/tsdb/tsdbCache.c" - "src/tsdb/tsdbWrite.c" - "src/tsdb/tsdbReaderWriter.c" - "src/tsdb/tsdbUtil.c" - "src/tsdb/tsdbSnapshot.c" - "src/tsdb/tsdbCacheRead.c" - "src/tsdb/tsdbRetention.c" - "src/tsdb/tsdbDiskData.c" - "src/tsdb/tsdbMergeTree.c" - "src/tsdb/tsdbDataIter.c" - # # dev - "src/tsdb/dev/tsdbCommit.c" - "src/tsdb/dev/tsdbMerge.c" - "src/tsdb/dev/tsdbSttFWriter.c" - - # tq - "src/tq/tq.c" - "src/tq/tqExec.c" - "src/tq/tqMeta.c" - "src/tq/tqRead.c" - "src/tq/tqOffset.c" - "src/tq/tqPush.c" - "src/tq/tqSink.c" - "src/tq/tqCommit.c" - "src/tq/tqSnapshot.c" - "src/tq/tqOffsetSnapshot.c" + ${VNODE_SOURCE_FILES} ) IF (TD_VNODE_PLUGINS) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index 3d016973ba..68647a8333 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -16,34 +16,69 @@ #include "tsdbSttFWriter.h" #include "tsdbUtil.h" +extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); +extern void tsdbCloseFile(STsdbFD **ppFD); +extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); +extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size); +extern int32_t tsdbFsyncFile(STsdbFD *pFD); + struct SSttFWriter { - SSttFWriterConf conf; - SBlockData bData; - SDelBlock dData; - SArray *aSttBlk; // SArray - SArray *aDelBlk; // SArray - SSkmInfo skmTb; - SSkmInfo skmRow; - STsdbFD *pFd; + SSttFWriterConf config; + // time-series data + SBlockData bData; + SSttBlk sttBlk; + SArray *aSttBlk; // SArray + // tombstone data + SDelBlock dData; + SArray *aDelBlk; // SArray + // helper data + SSkmInfo skmTb; + SSkmInfo skmRow; + STsdbFD *pFd; }; +static int32_t tsdbSttFWriteTSBlock(SSttFWriter *pWriter) { + int32_t code = 0; + int32_t lino; + + SBlockData *pBData = &pWriter->bData; + + // compress data block + code = tCmprBlockData(pBData, pWriter->config.cmprAlg, NULL, NULL, NULL /* TODO */, NULL /* TODO */); + TSDB_CHECK_CODE(code, lino, _exit); + + TSDB_CHECK_NULL(taosArrayPush(pWriter->aSttBlk, &pWriter->sttBlk), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY); + + tBlockDataClear(pBData); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + return code; +} + int32_t tsdbSttFWriterOpen(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) { int32_t code = 0; - int32_t lino = 0; + int32_t lino; - ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); - if (ppWriter[0] == NULL) { + if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - ppWriter[0]->conf = pConf[0]; - if (ppWriter[0]->conf.pSkmTb == NULL) ppWriter[0]->conf.pSkmTb = &ppWriter[0]->skmTb; - if (ppWriter[0]->conf.pSkmRow == NULL) ppWriter[0]->conf.pSkmRow = &ppWriter[0]->skmRow; + ppWriter[0]->config = pConf[0]; + if (ppWriter[0]->config.pSkmTb == NULL) ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb; + if (ppWriter[0]->config.pSkmRow == NULL) ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow; tBlockDataCreate(&ppWriter[0]->bData); // tDelBlockCreate(&ppWriter[0]->dData); + int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO + code = tsdbOpenFile(NULL /* TODO */, pConf->szPage, flag, &ppWriter[0]->pFd); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -63,7 +98,41 @@ int32_t tsdbSttFWriterClose(SSttFWriter **ppWriter) { int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { int32_t code = 0; - // TODO + int32_t lino; + + if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { + if (pWriter->bData.nRow > 0) { + code = tsdbSttFWriteTSBlock(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // TODO: code = tsdbUpdateTableSchema(pWriter->config.pTsdb, tbid->uid, tbid->suid, pWriter->config.pSkmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + TABLEID id = {.suid = tbid->suid, .uid = tbid->suid ? 0 : tbid->uid}; + code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (pRow->type == TSDBROW_ROW_FMT) { + // TODO: code = tsdbUpdateRowSchema(pWriter->config.pTsdb, tbid->uid, tbid->suid, pRow->row, + // pWriter->config.pSkmRow); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pWriter->bData.nRow >= pWriter->config.maxRow) { + code = tsdbSttFWriteTSBlock(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } return code; } @@ -71,108 +140,4 @@ int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelD int32_t code = 0; // TODO return code; -} - -#if 0 -extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); -extern void tsdbCloseFile(STsdbFD **ppFD); -struct SSttFWriter { - STsdb *pTsdb; - SSttFile file; - SBlockData bData; - SArray *aSttBlk; - STsdbFD *pFd; - SSkmInfo *pSkmTb; - SSkmInfo *pSkmRow; - int32_t maxRow; -}; - -static int32_t tsdbSttFWriteTSDataBlock(SSttFWriter *pWriter) { - int32_t code = 0; - // TODO - return code; -} - -int32_t tsdbSttFWriterOpen(STsdb *pTsdb, const SSttFile *pSttFile, SSttFWriter **ppWriter) { - int32_t code = 0; - int32_t lino = 0; - int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; - int32_t flag = TD_FILE_READ | TD_FILE_WRITE; - char fname[TSDB_FILENAME_LEN]; - - ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); - if (ppWriter[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - ppWriter[0]->pTsdb = pTsdb; - ppWriter[0]->file = pSttFile[0]; - - tBlockDataCreate(&ppWriter[0]->bData); - ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk)); - if (ppWriter[0]->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pSttFile->size == 0) flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); - tsdbSttFileName(pTsdb, (SDiskID){0}, 0, &ppWriter[0]->file, fname); // TODO - code = tsdbOpenFile(fname, szPage, flag, &ppWriter[0]->pFd); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - if (ppWriter[0]) { - tBlockDataDestroy(&ppWriter[0]->bData); - taosArrayDestroy(ppWriter[0]->aSttBlk); - taosMemoryFree(ppWriter[0]); - ppWriter[0] = NULL; - } - } - return 0; -} - -int32_t tsdbSttFWriterClose(SSttFWriter *pWriter) { - // TODO - return 0; -} - -int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { - int32_t code = 0; - int32_t lino = 0; - - if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { - if (pWriter->bData.nRow > 0) { - code = tsdbSttFWriteTSDataBlock(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataInit(&pWriter->bData, tbid, NULL /* TODO */, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL /* TODO */, tbid->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pWriter->bData.nRow >= pWriter->maxRow) { - code = tsdbSttFWriteTSDataBlock(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return 0; -} - -int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, int64_t suid, int64_t uid, int64_t version) { - int32_t code = 0; - int32_t lino = 0; - - // TODO write row - return 0; -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h index 08aa59716c..2e153da38a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h @@ -35,6 +35,8 @@ struct SSttFWriterConf { SSkmInfo *pSkmTb; SSkmInfo *pSkmRow; int32_t maxRow; + int32_t szPage; + int8_t cmprAlg; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h index dd829182b3..07c3a5d7ed 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h @@ -27,11 +27,14 @@ typedef struct SDelBlock SDelBlock; typedef struct SDelBlk SDelBlk; /* Exposed APIs */ +int32_t tDelBlockCreate(SDelBlock *pDelBlock); +int32_t tDelBlockDestroy(SDelBlock *pDelBlock); +int32_t tDelBlockClear(SDelBlock *pDelBlock); +int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData); /* Exposed Structs */ struct SDelBlock { - // - SColData aColData[4]; + SColData aColData[4]; // }; struct SDelBlk { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 75d12a3b32..aaa2e1dc73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -141,7 +141,7 @@ _exit: return code; } -static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) { +int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); @@ -173,7 +173,7 @@ _exit: return code; } -static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { +int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t n = 0; int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); @@ -202,7 +202,7 @@ _exit: return code; } -static int32_t tsdbFsyncFile(STsdbFD *pFD) { +int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; code = tsdbWriteFilePage(pFD);