From d28fae1a45eac92e7141ca275223b8bf28a81228 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 11 Apr 2020 10:20:19 +0800 Subject: [PATCH] TD-100 --- src/common/inc/dataformat.h | 2 +- src/common/src/dataformat.c | 62 +++++++++++++++++++++++++++++++ src/vnode/tsdb/src/tsdbRWHelper.c | 34 +++++++++++++---- 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 231786ff73..783e378eb7 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -111,7 +111,6 @@ typedef struct SDataCol { int len; int offset; void * pData; // Original data - void * pCData; // Compressed data } SDataCol; typedef struct { @@ -133,6 +132,7 @@ typedef struct { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); +SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 1f5d83d9af..fd7dd5b91e 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "dataformat.h" +#include "tutil.h" static int tdFLenFromSchema(STSchema *pSchema); @@ -338,6 +339,27 @@ void tdFreeDataCols(SDataCols *pCols) { } } +SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { + SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); + if (pRet == NULL) return NULL; + + pRet->numOfCols = pDataCols->numOfCols; + pRet->sversion = pDataCols->sversion; + if (keepData) pRet->numOfPoints = pDataCols->numOfPoints; + + for (int i = 0; i < pDataCols->numOfCols; i++) { + pRet->cols[i].type = pDataCols->cols[i].type; + pRet->cols[i].colId = pDataCols->cols[i].colId; + pRet->cols[i].bytes = pDataCols->cols[i].bytes; + pRet->cols[i].len = pDataCols->cols[i].len; + pRet->cols[i].offset = pDataCols->cols[i].offset; + + if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].len); + } + + return pRet; +} + void tdResetDataCols(SDataCols *pCols) { pCols->numOfPoints = 0; for (int i = 0; i < pCols->maxCols; i++) { @@ -384,5 +406,45 @@ static int tdFLenFromSchema(STSchema *pSchema) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { // TODO + ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); + + SDataCols *pTarget = tdDupDataCols(target, true); + if (pTarget == NULL) goto _err; + + int iter1 = 0; + int iter2 = 0; + while (true) { + if (iter1 >= pTarget->numOfPoints) { + // TODO: merge the source part + int rowsLeft = source->numOfPoints - iter2; + if (rowsLeft > 0) { + for (int i = 0; i < source->numOfCols; i++) { + ASSERT(target->cols[i].type == source->cols[i].type); + + memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), + TYPE_BYTES[target->cols[i].type] * rowsLeft); + } + } + break; + } + + if (iter2 >= source->numOfPoints) { + // TODO: merge the pTemp part + int rowsLeft = pTarget->numOfPoints - iter1; + if (rowsLeft > 0) { + + } + break; + } + + + + + } + return 0; + + _err: + return -1; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index fb46deb216..7f1c642ef4 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -43,6 +43,7 @@ static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SData static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey); int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) { if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1; @@ -403,7 +404,7 @@ static int comparColIdDataCol(const void *arg1, const void *arg2) { static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) { size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; - if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset) < 0) return -1; + if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1; if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1; return 0; @@ -440,19 +441,38 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block - if (pCompBlock->numOfSubBlocks == 1) { + int numOfSubBlocks = pCompBlock->numOfSubBlocks; + SCompBlock *pStartBlock = + (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset); + + if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1; + for (int i = 1; i < numOfSubBlocks; i++) { + pStartBlock++; + if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; + tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints); } - - - return 0; } +// Load the whole block data int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { - // TODO + int16_t *colIds = (int16_t *)calloc(pDataCols->numOfCols, sizeof(int16_t)); + if (colIds == NULL) goto _err; + + for (int i = 0; i < pDataCols->numOfCols; i++) { + colIds[i] = pDataCols->cols[i].colId; + } + + if (tsdbLoadBlockDataCols(pHelper, pDataCols, blkIdx, colIds, pDataCols->numOfCols) < 0) goto _err; + + tfree(colIds); return 0; + +_err: + tfree(colIds); + return -1; } static int tsdbCheckHelperCfg(SHelperCfg *pCfg) { @@ -672,7 +692,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast); int maxRowsCanMerge = - MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, keyLimit)); + MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, 0, keyLimit)); if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) { // Need to load the block and split as two super block