TD-166
This commit is contained in:
parent
d388a8b75c
commit
db14d1e00c
|
@ -447,13 +447,11 @@ typedef struct {
|
||||||
|
|
||||||
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
||||||
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
||||||
// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
|
|
||||||
void tsdbDestroyHelper(SRWHelper *pHelper);
|
void tsdbDestroyHelper(SRWHelper *pHelper);
|
||||||
void tsdbResetHelper(SRWHelper *pHelper);
|
void tsdbResetHelper(SRWHelper *pHelper);
|
||||||
|
|
||||||
// --------- For set operations
|
// --------- For set operations
|
||||||
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
|
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
|
||||||
// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
|
|
||||||
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
|
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
|
||||||
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
|
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,11 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
|
||||||
// Init block part
|
// Init block part
|
||||||
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
|
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
|
||||||
|
|
||||||
|
pHelper->blockBuffer =
|
||||||
|
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols +
|
||||||
|
pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM));
|
||||||
|
if (pHelper->blockBuffer == NULL) goto _err;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -149,6 +154,8 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||||
|
|
||||||
void tsdbDestroyHelper(SRWHelper *pHelper) {
|
void tsdbDestroyHelper(SRWHelper *pHelper) {
|
||||||
if (pHelper) {
|
if (pHelper) {
|
||||||
|
tzfree(pHelper->blockBuffer);
|
||||||
|
tzfree(pHelper->compBuffer);
|
||||||
tsdbDestroyHelperFile(pHelper);
|
tsdbDestroyHelperFile(pHelper);
|
||||||
tsdbDestroyHelperTable(pHelper);
|
tsdbDestroyHelperTable(pHelper);
|
||||||
tsdbDestroyHelperBlock(pHelper);
|
tsdbDestroyHelperBlock(pHelper);
|
||||||
|
@ -563,6 +570,8 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
||||||
void *pStart = NULL;
|
void *pStart = NULL;
|
||||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints;
|
pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints;
|
||||||
|
} else {
|
||||||
|
pStart = pDataCol->pData;
|
||||||
}
|
}
|
||||||
// TODO: get rid of INT32_MAX here
|
// TODO: get rid of INT32_MAX here
|
||||||
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfPoints, pStart,
|
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfPoints, pStart,
|
||||||
|
@ -597,8 +606,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
||||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
|
|
||||||
pHelper->blockBuffer = trealloc(pHelper->blockBuffer, pCompBlock->len);
|
ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len);
|
||||||
if (pHelper->blockBuffer == NULL) return -1;
|
|
||||||
|
|
||||||
SCompData *pCompData = (SCompData *)pHelper->blockBuffer;
|
SCompData *pCompData = (SCompData *)pHelper->blockBuffer;
|
||||||
|
|
||||||
|
@ -627,9 +635,13 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
||||||
|
|
||||||
if (pCompCol->colId == pDataCol->colId) {
|
if (pCompCol->colId == pDataCol->colId) {
|
||||||
|
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
||||||
|
pHelper->compBuffer = trealloc(pHelper->compBuffer, pCompCol->len + COMP_OVERFLOW_BYTES);
|
||||||
|
if (pHelper->compBuffer == NULL) goto _err;
|
||||||
|
}
|
||||||
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
||||||
pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, pHelper->compBuffer,
|
pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints,
|
||||||
tsizeof(pHelper->compBuffer)) < 0)
|
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
|
||||||
goto _err;
|
goto _err;
|
||||||
dcol++;
|
dcol++;
|
||||||
ccol++;
|
ccol++;
|
||||||
|
@ -731,6 +743,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
|
|
||||||
// TODO: compresee the data
|
// TODO: compresee the data
|
||||||
if (pHelper->config.compress) {
|
if (pHelper->config.compress) {
|
||||||
|
if (pHelper->config.compress == TWO_STAGE_COMP) {
|
||||||
|
pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
|
||||||
|
if (pHelper->compBuffer == NULL) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
|
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
|
||||||
(char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress,
|
(char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress,
|
||||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
||||||
|
|
Loading…
Reference in New Issue