From 555469c77570f452409bd3164f3532aad89975eb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 14 Jun 2022 13:19:46 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 16 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 462 ++++++++---------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 153 +++--- source/dnode/vnode/src/tsdb/tsdbUtil.c | 113 +---- 4 files changed, 307 insertions(+), 437 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 191785895a..0560b6060a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -42,7 +42,6 @@ typedef struct SMemTable SMemTable; typedef struct STbDataIter STbDataIter; typedef struct SMergeInfo SMergeInfo; typedef struct STable STable; -typedef struct SOffset SOffset; typedef struct SMapData SMapData; typedef struct SColData SColData; typedef struct SColDataBlock SColDataBlock; @@ -93,10 +92,11 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); // SDataFWriter typedef struct SDataFWriter SDataFWriter; -int32_t tsdbDataFWriterOpen(SDataFWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); -int32_t tsdbDataFWriterClose(SDataFWriter *pWriter); +int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync); +int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize); +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize); int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); @@ -104,7 +104,7 @@ int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t * // SDataFReader typedef struct SDataFReader SDataFReader; -int32_t tsdbDataFReaderOpen(SDataFReader *pReader, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader *pReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); @@ -165,12 +165,6 @@ int32_t tPutDelFileHdr(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile); // structs -struct SOffset { - int32_t nOffset; - uint8_t flag; - uint8_t *pOffset; -}; - typedef struct { int minFid; int midFid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b440dac24f..73fb117ca6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -16,34 +16,24 @@ #include "tsdb.h" typedef struct { - STsdb *pTsdb; - uint8_t *pBuf1; - uint8_t *pBuf2; - uint8_t *pBuf3; - uint8_t *pBuf4; - uint8_t *pBuf5; + STsdb *pTsdb; /* commit data */ int32_t minutes; int8_t precision; int32_t minRow; int32_t maxRow; + // -------------- + TSKEY nextKey; + int32_t commitFid; + TSKEY minKey; + TSKEY maxKey; // commit file data - TSKEY nextKey; - int32_t commitFid; - TSKEY minKey; - TSKEY maxKey; SDataFReader *pReader; SMapData oBlockIdx; // SMapData, read from reader + SMapData oBlock; // SMapData, read from reader SDataFWriter *pWriter; SMapData nBlockIdx; // SMapData, build by committer - // commit table data - STbDataIter iter; - STbDataIter *pIter; - SBlockIdx *pBlockIdx; - SMapData oBlock; - SMapData nBlock; - SColDataBlock oColDataBlock; - SColDataBlock nColDataBlock; + SMapData nBlock; // SMapData /* commit del */ SDelFReader *pDelFReader; SMapData oDelIdxMap; // SMapData, old @@ -124,21 +114,6 @@ _err: return code; } -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { - int32_t code = 0; - - memset(pCommitter, 0, sizeof(*pCommitter)); - ASSERT(pTsdb->mem && pTsdb->imem == NULL); - // lock(); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - // unlock(); - - pCommitter->pTsdb = pTsdb; - - return code; -} - static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -371,15 +346,192 @@ _err: return code; } -static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { - int32_t code = 0; - // TODO +static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { + int32_t code = 0; + STbDataIter iter; + TSDBROW row; + SBlockIdx blockIdx; + + // check: if no memory data and no disk data, exit + if (pTbData) { + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &iter); + if ((!tsdbTbDataIterGet(&iter, &row) || row.pTSRow->ts > pCommitter->maxKey) && pBlockIdx == NULL) { + goto _exit; + } + } + + // start + tMapDataReset(&pCommitter->oBlock); + tMapDataReset(&pCommitter->nBlock); + if (pBlockIdx) { + code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL); + if (code) goto _err; + } + + // impl + + // end + +_exit: + return code; + +_err: + tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter); -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); -static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter); +static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SDFileSet *pRSet = NULL; // TODO + SDFileSet *pWSet = NULL; // TODO + + // memory + pCommitter->nextKey = TSKEY_MAX; + tMapDataReset(&pCommitter->oBlockIdx); + tMapDataReset(&pCommitter->oBlock); + tMapDataReset(&pCommitter->nBlockIdx); + tMapDataReset(&pCommitter->nBlock); + + // load old + if (pRSet) { + code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); + if (code) goto _err; + + code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx, NULL); + if (code) goto _err; + } + + // create new + code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); + if (code) goto _err; + +_exit: + return code; + +_err: + tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { + int32_t code = 0; + int32_t c; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + int32_t iTbData = 0; + int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); + int32_t iBlockIdx = 0; + int32_t nBlockIdx = pCommitter->oBlockIdx.nItem; + STbData *pTbData; + SBlockIdx *pBlockIdx = NULL; + SBlockIdx blockIdx; + + ASSERT(nTbData > 0); + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + if (iBlockIdx < nBlockIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); + if (code) goto _err; + pBlockIdx = &blockIdx; + } + + while (true) { + if (pTbData == NULL && pBlockIdx == NULL) break; + + if (pTbData && pBlockIdx) { + c = tTABLEIDCmprFn(pTbData, pBlockIdx); + + if (c == 0) { + goto _commit_mem_and_disk_data; + } else if (c < 0) { + goto _commit_mem_data; + } else { + goto _commit_disk_data; + } + } else if (pTbData) { + goto _commit_mem_data; + } else { + goto _commit_disk_data; + } + + _commit_mem_data: + code = tsdbCommitTableData(pCommitter, pTbData, NULL); + if (code) goto _err; + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + continue; + + _commit_disk_data: + code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); + if (code) goto _err; + iBlockIdx++; + if (iBlockIdx < nBlockIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); + if (code) goto _err; + pBlockIdx = &blockIdx; + } else { + pBlockIdx = NULL; + } + continue; + + _commit_mem_and_disk_data: + code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx); + if (code) goto _err; + iTbData++; + iBlockIdx++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + if (iBlockIdx < nBlockIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, tGetBlockIdx); + if (code) goto _err; + pBlockIdx = &blockIdx; + } else { + pBlockIdx = NULL; + } + continue; + } + + return code; + +_err: + tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { + int32_t code = 0; + + // write blockIdx + code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdx, NULL); + if (code) goto _err; + + // update file header + code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL); + if (code) goto _err; + + // close and sync + code = tsdbDataFWriterClose(pCommitter->pWriter, 1); + if (code) goto _err; + + if (pCommitter->pReader) { + code = tsdbDataFReaderClose(pCommitter->pReader); + goto _err; + } + +_exit: + return code; + +_err: + tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t code = 0; @@ -408,227 +560,19 @@ _err: return code; } -static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; // TODO - SDFileSet *pWSet = NULL; // TODO - - // memory - tMapDataReset(&pCommitter->oBlockIdx); - tMapDataReset(&pCommitter->nBlockIdx); - - // load old - if (pRSet) { - code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet); - if (code) goto _err; - - code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx); - if (code) goto _err; - } - - // create new - code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); - if (code) goto _err; - -_exit: - return code; - -_err: - tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitTableData(SCommitter *pCommitter); - -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); - int32_t iBlockIdx = 0; - int32_t nBlockIdx = pCommitter->oBlockIdx.nItem; - STbData *pTbData; - SBlockIdx *pBlockIdx; - SBlockIdx blockIdx; - int32_t c; - - while (iTbData < nTbData || iBlockIdx < nBlockIdx) { - pTbData = NULL; - pBlockIdx = NULL; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, NULL /* TODO */); - pBlockIdx = &blockIdx; - } - - if (pTbData && pBlockIdx) { - c = tTABLEIDCmprFn(pTbData, pBlockIdx); - - if (c == 0) { - iTbData++; - iBlockIdx++; - } else if (c < 0) { - iTbData++; - pBlockIdx = NULL; - } else { - iBlockIdx++; - pTbData = NULL; - } - } else { - if (pTbData) { - iBlockIdx++; - } - if (pBlockIdx) { - iTbData++; - } - } - - if (pTbData && - !tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pCommitter->iter)) { - pTbData = NULL; - } - - if (pTbData == NULL && pBlockIdx == NULL) continue; - - pCommitter->pTbData = pTbData; - pCommitter->pBlockIdx = pBlockIdx; - - code = tsdbCommitTableData(pCommitter); - if (code) goto _err; - } - - return code; - -_err: - return code; -} - -static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { +// ---------------------------------------------------------------------------- +static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { int32_t code = 0; - code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->nBlockIdx, NULL); - if (code) goto _err; + memset(pCommitter, 0, sizeof(*pCommitter)); + ASSERT(pTsdb->mem && pTsdb->imem == NULL); + // lock(); + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; + // unlock(); - code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL); - if (code) goto _err; + pCommitter->pTsdb = pTsdb; - code = tsdbDFileSetWriterClose(pCommitter->pWriter, 1); - if (code) goto _err; - - if (pCommitter->pReader) { - code = tsdbDFileSetReaderClose(pCommitter->pReader); - goto _err; - } - -_exit: - return code; - -_err: - return code; -} - -static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter); -static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter); -static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter); - -static int32_t tsdbCommitTableData(SCommitter *pCommitter) { - int32_t code = 0; - - // start - code = tsdbCommitTableDataStart(pCommitter); - if (code) { - goto _err; - } - - // impl - code = tsdbCommitTableDataImpl(pCommitter); - if (code) { - goto _err; - } - - // end - code = tsdbCommitTableDataEnd(pCommitter); - if (code) { - goto _err; - } - -_exit: - return code; - -_err: - return code; -} - -static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) { - int32_t code = 0; - - // old - tMapDataReset(&pCommitter->oBlock); - if (pCommitter->pBlockIdx) { - code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL); - if (code) goto _err; - } - - // new - tMapDataReset(&pCommitter->nBlock); - -_err: - return code; -} - -static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - STbDataIter *pIter = NULL; - int32_t iBlock = 0; - int32_t nBlock = pCommitter->nBlock.nItem; - SBlock *pBlock; - SBlock block; - TSDBROW *pRow; - TSDBROW row; - int32_t iRow = 0; - STSchema *pTSchema = NULL; - - if (pCommitter->pTbData) { - code = tsdbTbDataIterCreate(pCommitter->pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pIter); - if (code) goto _err; - } - - if (iBlock < nBlock) { - pBlock = █ - } else { - pBlock = NULL; - } - - tsdbTbDataIterGet(pIter, pRow); - - // loop to merge memory data and disk data - for (; pBlock == NULL || (pRow && pRow->pTSRow->ts <= pCommitter->maxKey);) { - if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) { - // only has block data, then move to new index file - } else if (0) { - // only commit memory data - } else { - // merge memory and block data - } - } - - tsdbTbDataIterDestroy(pIter); - return code; - -_err: - tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - tsdbTbDataIterDestroy(pIter); - return code; -} - -static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) { - int32_t code = 0; - // TODO return code; } @@ -702,3 +646,9 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter) { // TODO return code; } + +static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { + int32_t code = 0; + // TODO + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 45ffd42e86..733d1ad8e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -18,64 +18,6 @@ #define TSDB_FHDR_SIZE 512 #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) -// SDFileSetWritter ==================================================== -struct SDFileSetWritter { - STsdb *pTsdb; - int32_t szBuf1; - uint8_t *pBuf1; - int32_t szBuf2; - uint8_t *pBuf2; -}; - -// SDFileSetReader ==================================================== -struct SDFileSetReader { - STsdb *pTsdb; - int32_t szBuf1; - uint8_t *pBuf1; - int32_t szBuf2; - uint8_t *pBuf2; -}; - -int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) { - int32_t code = 0; - - memset(pReader, 0, sizeof(*pReader)); - pReader->pTsdb = pTsdb; - - return code; - -_err: - tsdbError("vgId:%d failed to open SDFileSetReader since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader) { - int32_t code = 0; - - taosMemoryFreeClear(pReader->pBuf1); - taosMemoryFreeClear(pReader->pBuf2); - - return code; -} - -int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray) { - int32_t code = 0; - // TODO - return code; -} - -int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo) { - int32_t code = 0; - // TODO - return code; -} - -int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis) { - int32_t code = 0; - // TODO - return code; -} - // SDelFWriter ==================================================== struct SDelFWriter { STsdb *pTsdb; @@ -439,4 +381,97 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB _err: tsdbError("vgId:%d read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; -} \ No newline at end of file +} + +// SDataFReader ==================================================== +struct SDataFReader { + STsdb *pTsdb; + SDFileSet *pSet; + TdFilePtr pReadH; +}; + +int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbDataFReaderClose(SDataFReader *pReader) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) { + int32_t code = 0; + // TODO + return code; +} + +// SDataFWriter ==================================================== +struct SDataFWriter { + STsdb *pTsdb; + SDFileSet *pSet; + TdFilePtr pWriteH; +}; + +int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset, + int64_t *rSize) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize) { + int32_t code = 0; + // TODO + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index dd358ff3a1..4d3e890095 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -15,121 +15,10 @@ #include "tsdb.h" -// SOffset ======================================================================= #define TSDB_OFFSET_I32 ((uint8_t)0) #define TSDB_OFFSET_I16 ((uint8_t)1) #define TSDB_OFFSET_I8 ((uint8_t)2) -static FORCE_INLINE int32_t tsdbOffsetSize(SOffset *pOfst) { - switch (pOfst->flag) { - case TSDB_OFFSET_I32: - return sizeof(int32_t); - case TSDB_OFFSET_I16: - return sizeof(int16_t); - case TSDB_OFFSET_I8: - return sizeof(int8_t); - default: - ASSERT(0); - } -} - -static FORCE_INLINE int32_t tsdbGetOffset(SOffset *pOfst, int32_t idx) { - int32_t offset = -1; - - if (idx >= 0 && idx < pOfst->nOffset) { - switch (pOfst->flag) { - case TSDB_OFFSET_I32: - offset = ((int32_t *)pOfst->pOffset)[idx]; - break; - case TSDB_OFFSET_I16: - offset = ((int16_t *)pOfst->pOffset)[idx]; - break; - case TSDB_OFFSET_I8: - offset = ((int8_t *)pOfst->pOffset)[idx]; - break; - default: - ASSERT(0); - } - - ASSERT(offset >= 0); - } - - return offset; -} - -static FORCE_INLINE int32_t tsdbAddOffset(SOffset *pOfst, int32_t offset) { - int32_t code = 0; - int32_t nOffset = pOfst->nOffset; - - ASSERT(pOfst->flag == TSDB_OFFSET_I32); - ASSERT(offset >= 0); - - pOfst->nOffset++; - - // alloc - code = tsdbRealloc(&pOfst->pOffset, sizeof(int32_t) * pOfst->nOffset); - if (code) goto _exit; - - // put - ((int32_t *)pOfst->pOffset)[nOffset] = offset; - -_exit: - return code; -} - -static FORCE_INLINE int32_t tPutOffset(uint8_t *p, SOffset *pOfst) { - int32_t n = 0; - int32_t maxOffset; - - ASSERT(pOfst->flag == TSDB_OFFSET_I32); - ASSERT(pOfst->nOffset > 0); - - maxOffset = tsdbGetOffset(pOfst, pOfst->nOffset - 1); - - n += tPutI32v(p ? p + n : p, pOfst->nOffset); - if (maxOffset <= INT8_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8); - for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) { - n += tPutI8(p ? p + n : p, (int8_t)tsdbGetOffset(pOfst, iOffset)); - } - } else if (maxOffset <= INT16_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16); - for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) { - n += tPutI16(p ? p + n : p, (int16_t)tsdbGetOffset(pOfst, iOffset)); - } - } else { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32); - for (int32_t iOffset = 0; iOffset < pOfst->nOffset; iOffset++) { - n += tPutI32(p ? p + n : p, (int32_t)tsdbGetOffset(pOfst, iOffset)); - } - } - - return n; -} - -static FORCE_INLINE int32_t tGetOffset(uint8_t *p, SOffset *pOfst) { - int32_t n = 0; - - n += tGetI32v(p + n, &pOfst->nOffset); - n += tGetU8(p + n, &pOfst->flag); - pOfst->pOffset = p + n; - switch (pOfst->flag) { - case TSDB_OFFSET_I32: - n = n + pOfst->nOffset + sizeof(int32_t); - break; - case TSDB_OFFSET_I16: - n = n + pOfst->nOffset + sizeof(int16_t); - break; - case TSDB_OFFSET_I8: - n = n + pOfst->nOffset + sizeof(int8_t); - break; - default: - ASSERT(0); - } - - return n; -} - // SMapData ======================================================================= void tMapDataReset(SMapData *pMapData) { pMapData->flag = TSDB_OFFSET_I32; @@ -299,6 +188,7 @@ void tsdbFree(uint8_t *pBuf) { } } +// TABLEID ======================================================================= int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { TABLEID *pId1 = (TABLEID *)p1; TABLEID *pId2 = (TABLEID *)p2; @@ -318,6 +208,7 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { return 0; } +// TSDBKEY ======================================================================= int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey2 = (TSDBKEY *)p2;