From 2a41f1d3f6566539ff6b8440241f1fb38a3e1c19 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 27 Jun 2022 06:07:45 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 14 ++- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 41 ++++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 105 ++++++++++++++++++ 3 files changed, 150 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index aea5b3ccc1..a7831f301c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -86,8 +86,8 @@ typedef struct STsdbFSState STsdbFSState; #define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow]) #define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow) #define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)}) -#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}); -#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}); +#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}) +#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}) void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); @@ -132,6 +132,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs); void tColDataReset(SColData *pColData, int16_t cid, int8_t type); void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); +int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataPCmprFn(const void *p1, const void *p2); // SBlockData @@ -142,6 +143,8 @@ void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); +int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); +int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -374,9 +377,9 @@ typedef struct { int64_t nRow; int8_t cmprAlg; int64_t offset; - int64_t vsize; // VERSION size - int64_t ksize; // TSKEY size - int64_t bsize; + int64_t vsize; // VERSION size + int64_t ksize; // TSKEY size + int64_t bsize; // total block size SMapData mBlockCol; // SMapData } SSubBlock; @@ -412,7 +415,6 @@ struct SColData { int32_t *aOffset; int32_t nData; uint8_t *pData; - uint8_t *pBuf; }; struct SBlockData { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index b88a88de59..fb500ca1e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,7 +607,9 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, int64_t n; TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SBlockCol *pBlockCol = &(SBlockCol){}; + SBlockCol *pBlockCol = &(SBlockCol){0}; + + tBlockDataReset(pBlockData); // realloc code = tsdbRealloc(ppBuf1, pSubBlock->bsize); @@ -789,11 +791,42 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p if (code) goto _err; // read remain block data and do merg - iSubBlock++; - for (; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - ASSERT(0); + if (pBlock->nSubBlock > 1) { + SBlockData *pBlockData1 = &(SBlockData){0}; + SBlockData *pBlockData2 = &(SBlockData){0}; + + for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } + + code = tBlockDataCopy(pBlockData, pBlockData2); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } + + // merge two block data + code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } + } + + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); } + ASSERT(pBlock->nRow == pBlockData->nRow); + ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0); + ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0); + if (pBuf1) tsdbFree(pBuf1); if (pBuf2) tsdbFree(pBuf2); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index b1c20a4340..424f7fc5e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -939,6 +939,29 @@ _exit: return code; } +int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) { + int32_t code = 0; + + pColDataDest->cid = pColDataDest->cid; + pColDataDest->type = pColDataDest->type; + pColDataDest->offsetValid = 0; + pColDataDest->nVal = pColDataSrc->nVal; + pColDataDest->flag = pColDataSrc->flag; + if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) { + code = tsdbRealloc(&pColDataDest->pBitMap, BIT2_SIZE(pColDataDest->nVal)); + if (code) goto _exit; + + memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, BIT2_SIZE(pColDataSrc->nVal)); + } + pColDataDest->nData = pColDataSrc->nData; + code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData); + if (code) goto _exit; + memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataSrc->nData); + +_exit: + return code; +} + static int32_t tColDataUpdateOffset(SColData *pColData) { int32_t code = 0; SValue value; @@ -1169,3 +1192,85 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS _err: return code; } + +int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData) { + int32_t code = 0; + + tBlockDataReset(pBlockData); + + // loop to merge + int32_t iRow1 = 0; + int32_t nRow1 = pBlockData1->nRow; + int32_t iRow2 = 0; + int32_t nRow2 = pBlockData2->nRow; + TSDBROW row1; + TSDBROW row2; + int32_t c; + + while (iRow1 < nRow1 && iRow2 < nRow2) { + row1 = tsdbRowFromBlockData(pBlockData1, iRow1); + row2 = tsdbRowFromBlockData(pBlockData2, iRow2); + + c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2)); + if (c < 0) { + code = tBlockDataAppendRow(pBlockData, &row1, NULL); + if (code) goto _exit; + iRow1++; + } else if (c > 0) { + code = tBlockDataAppendRow(pBlockData, &row2, NULL); + if (code) goto _exit; + iRow2++; + } else { + ASSERT(0); + } + } + + while (iRow1 < nRow1) { + row1 = tsdbRowFromBlockData(pBlockData1, iRow1); + code = tBlockDataAppendRow(pBlockData, &row1, NULL); + if (code) goto _exit; + iRow1++; + } + + while (iRow2 < nRow2) { + row2 = tsdbRowFromBlockData(pBlockData2, iRow2); + code = tBlockDataAppendRow(pBlockData, &row2, NULL); + if (code) goto _exit; + iRow2++; + } + +_exit: + return code; +} + +int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) { + int32_t code = 0; + SColData *pColDataSrc; + SColData *pColDataDest; + + ASSERT(pBlockDataSrc->nRow > 0); + + tBlockDataReset(pBlockDataDest); + + pBlockDataDest->nRow = pBlockDataSrc->nRow; + // TSDBKEY + code = tsdbRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow); + if (code) goto _exit; + code = tsdbRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow); + if (code) goto _exit; + memcpy(pBlockDataDest->aVersion, pBlockDataSrc->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow); + memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow); + + // other + for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aColDataP); iColData++) { + pColDataSrc = (SColData *)taosArrayGetP(pBlockDataSrc->aColDataP, iColData); + code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest); + if (code) goto _exit; + + code = tColDataCopy(pColDataSrc, pColDataDest); + if (code) goto _exit; + } + +_exit: + return code; +} \ No newline at end of file