TD-100
This commit is contained in:
parent
ac209abb50
commit
d28fae1a45
|
@ -111,7 +111,6 @@ typedef struct SDataCol {
|
||||||
int len;
|
int len;
|
||||||
int offset;
|
int offset;
|
||||||
void * pData; // Original data
|
void * pData; // Original data
|
||||||
void * pCData; // Compressed data
|
|
||||||
} SDataCol;
|
} SDataCol;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -133,6 +132,7 @@ typedef struct {
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
||||||
void tdResetDataCols(SDataCols *pCols);
|
void tdResetDataCols(SDataCols *pCols);
|
||||||
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||||
|
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||||
void tdFreeDataCols(SDataCols *pCols);
|
void tdFreeDataCols(SDataCols *pCols);
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
|
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
|
||||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
|
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "dataformat.h"
|
#include "dataformat.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
static int tdFLenFromSchema(STSchema *pSchema);
|
static int tdFLenFromSchema(STSchema *pSchema);
|
||||||
|
|
||||||
|
@ -338,6 +339,27 @@ void tdFreeDataCols(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
||||||
|
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
|
||||||
|
if (pRet == NULL) return NULL;
|
||||||
|
|
||||||
|
pRet->numOfCols = pDataCols->numOfCols;
|
||||||
|
pRet->sversion = pDataCols->sversion;
|
||||||
|
if (keepData) pRet->numOfPoints = pDataCols->numOfPoints;
|
||||||
|
|
||||||
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
|
pRet->cols[i].type = pDataCols->cols[i].type;
|
||||||
|
pRet->cols[i].colId = pDataCols->cols[i].colId;
|
||||||
|
pRet->cols[i].bytes = pDataCols->cols[i].bytes;
|
||||||
|
pRet->cols[i].len = pDataCols->cols[i].len;
|
||||||
|
pRet->cols[i].offset = pDataCols->cols[i].offset;
|
||||||
|
|
||||||
|
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].len);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pRet;
|
||||||
|
}
|
||||||
|
|
||||||
void tdResetDataCols(SDataCols *pCols) {
|
void tdResetDataCols(SDataCols *pCols) {
|
||||||
pCols->numOfPoints = 0;
|
pCols->numOfPoints = 0;
|
||||||
for (int i = 0; i < pCols->maxCols; i++) {
|
for (int i = 0; i < pCols->maxCols; i++) {
|
||||||
|
@ -384,5 +406,45 @@ static int tdFLenFromSchema(STSchema *pSchema) {
|
||||||
|
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
||||||
// TODO
|
// TODO
|
||||||
|
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
|
||||||
|
|
||||||
|
SDataCols *pTarget = tdDupDataCols(target, true);
|
||||||
|
if (pTarget == NULL) goto _err;
|
||||||
|
|
||||||
|
int iter1 = 0;
|
||||||
|
int iter2 = 0;
|
||||||
|
while (true) {
|
||||||
|
if (iter1 >= pTarget->numOfPoints) {
|
||||||
|
// TODO: merge the source part
|
||||||
|
int rowsLeft = source->numOfPoints - iter2;
|
||||||
|
if (rowsLeft > 0) {
|
||||||
|
for (int i = 0; i < source->numOfCols; i++) {
|
||||||
|
ASSERT(target->cols[i].type == source->cols[i].type);
|
||||||
|
|
||||||
|
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
|
||||||
|
(void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2),
|
||||||
|
TYPE_BYTES[target->cols[i].type] * rowsLeft);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter2 >= source->numOfPoints) {
|
||||||
|
// TODO: merge the pTemp part
|
||||||
|
int rowsLeft = pTarget->numOfPoints - iter1;
|
||||||
|
if (rowsLeft > 0) {
|
||||||
|
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
return -1;
|
||||||
}
|
}
|
|
@ -43,6 +43,7 @@ static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SData
|
||||||
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||||
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
|
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
|
||||||
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||||
|
static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey);
|
||||||
|
|
||||||
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) {
|
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) {
|
||||||
if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1;
|
if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1;
|
||||||
|
@ -403,7 +404,7 @@ static int comparColIdDataCol(const void *arg1, const void *arg2) {
|
||||||
|
|
||||||
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
|
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
|
||||||
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
|
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
|
||||||
if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset) < 0) return -1;
|
if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
|
||||||
if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
|
if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -440,19 +441,38 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx,
|
||||||
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||||
|
|
||||||
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
|
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
|
||||||
if (pCompBlock->numOfSubBlocks == 1) {
|
|
||||||
|
|
||||||
|
int numOfSubBlocks = pCompBlock->numOfSubBlocks;
|
||||||
|
SCompBlock *pStartBlock =
|
||||||
|
(numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
|
||||||
|
|
||||||
|
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
|
||||||
|
for (int i = 1; i < numOfSubBlocks; i++) {
|
||||||
|
pStartBlock++;
|
||||||
|
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
|
||||||
|
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load the whole block data
|
||||||
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
||||||
// TODO
|
int16_t *colIds = (int16_t *)calloc(pDataCols->numOfCols, sizeof(int16_t));
|
||||||
|
if (colIds == NULL) goto _err;
|
||||||
|
|
||||||
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
|
colIds[i] = pDataCols->cols[i].colId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbLoadBlockDataCols(pHelper, pDataCols, blkIdx, colIds, pDataCols->numOfCols) < 0) goto _err;
|
||||||
|
|
||||||
|
tfree(colIds);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tfree(colIds);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
|
static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
|
||||||
|
@ -672,7 +692,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
|
|
||||||
int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast);
|
int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast);
|
||||||
int maxRowsCanMerge =
|
int maxRowsCanMerge =
|
||||||
MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, keyLimit));
|
MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, 0, keyLimit));
|
||||||
|
|
||||||
if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) {
|
if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) {
|
||||||
// Need to load the block and split as two super block
|
// Need to load the block and split as two super block
|
||||||
|
|
Loading…
Reference in New Issue