Merge remote-tracking branch 'origin/feature/2.0tsdb' into feature/2.0tsdb
This commit is contained in:
commit
273c73b6c8
|
@ -371,12 +371,14 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
|
||||
if (keepData) {
|
||||
pRet->cols[i].len = pDataCols->cols[i].len;
|
||||
if (pDataCols->cols[i].len > 0) {
|
||||
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
|
||||
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pRet;
|
||||
}
|
||||
|
@ -443,9 +445,11 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
|||
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
|
||||
for (int i = 0; i < rowsToMerge; i++) {
|
||||
for (int j = 0; j < source->numOfCols; j++) {
|
||||
if (source->cols[j].len > 0) {
|
||||
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
}
|
||||
target->numOfRows++;
|
||||
}
|
||||
} else {
|
||||
|
@ -479,9 +483,11 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
|
|||
if (key1 <= key2) {
|
||||
for (int i = 0; i < src1->numOfCols; i++) {
|
||||
ASSERT(target->cols[i].type == src1->cols[i].type);
|
||||
if (src1->cols[i].len > 0) {
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
}
|
||||
|
||||
target->numOfRows++;
|
||||
(*iter1)++;
|
||||
|
@ -489,9 +495,11 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
|
|||
} else {
|
||||
for (int i = 0; i < src2->numOfCols; i++) {
|
||||
ASSERT(target->cols[i].type == src2->cols[i].type);
|
||||
if (src2->cols[i].len > 0) {
|
||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
||||
target->maxPoints);
|
||||
}
|
||||
}
|
||||
|
||||
target->numOfRows++;
|
||||
(*iter2)++;
|
||||
|
|
|
@ -381,8 +381,8 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
|||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, int16_t* colIds, int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock);
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
#define REPO_ID(r) (r)->config.tsdbId
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
#include "tscompression.h"
|
||||
#include "tsdbMain.h"
|
||||
|
||||
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
||||
|
||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
|
||||
|
@ -42,17 +44,16 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper);
|
|||
static void tsdbResetHelperBlock(SRWHelper *pHelper);
|
||||
static int tsdbInitHelperBlock(SRWHelper *pHelper);
|
||||
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
|
||||
static int comparColIdCompCol(const void *arg1, const void *arg2);
|
||||
static int comparColIdDataCol(const void *arg1, const void *arg2);
|
||||
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf);
|
||||
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
|
||||
SDataCols *pDataCols);
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
|
||||
int maxPoints, char *buffer, int bufferSize);
|
||||
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||
int numOfColIds);
|
||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
|
||||
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
|
||||
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
|
||||
static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
|
||||
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
|
||||
SDataCol *pDataCol);
|
||||
|
||||
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
||||
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||
|
@ -315,7 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
ASSERT(pCompBlock->last);
|
||||
|
||||
if (pCompBlock->numOfSubBlocks > 1) {
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1)) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
||||
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
|
||||
|
@ -510,14 +511,34 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
|||
|
||||
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
|
||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
|
||||
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||
|
||||
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
|
||||
if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
|
||||
size_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
|
||||
pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
|
||||
if (pHelper->pCompData == NULL) return -1;
|
||||
if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;
|
||||
if (pHelper->pCompData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tread(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
|
||||
tsdbError("vgId:%d failed to read %zu bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompData, tsize)) {
|
||||
tsdbError("vgId:%d file %s is broken, offset %" PRId64 " size %zu", REPO_ID(pHelper->pRepo), pFile->fname,
|
||||
(int64_t)pCompBlock->offset, tsize);
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);
|
||||
|
||||
|
@ -554,30 +575,31 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols)
|
|||
}
|
||||
}
|
||||
|
||||
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
|
||||
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||
|
||||
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds) {
|
||||
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
|
||||
|
||||
int numOfSubBlocks = pCompBlock->numOfSubBlocks;
|
||||
SCompBlock *pStartBlock =
|
||||
(numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
|
||||
if (numOfSubBlocks > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset);
|
||||
|
||||
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
|
||||
tdResetDataCols(pHelper->pDataCols[0]);
|
||||
if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err;
|
||||
for (int i = 1; i < numOfSubBlocks; i++) {
|
||||
pStartBlock++;
|
||||
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
|
||||
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
|
||||
tdResetDataCols(pHelper->pDataCols[1]);
|
||||
pCompBlock++;
|
||||
if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err;
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
|
||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock) {
|
||||
|
||||
int numOfSubBlock = pCompBlock->numOfSubBlocks;
|
||||
if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
|
||||
if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset);
|
||||
|
||||
tdResetDataCols(pHelper->pDataCols[0]);
|
||||
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
|
||||
|
@ -588,8 +610,6 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar
|
|||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
||||
}
|
||||
|
||||
// if (target) TODO
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
|
@ -648,7 +668,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
// Compress the data if neccessary
|
||||
int tcol = 0;
|
||||
int32_t toffset = 0;
|
||||
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
|
||||
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
|
||||
int32_t lsize = tsize;
|
||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
||||
if (tcol >= nColsNotAllNull) break;
|
||||
|
@ -770,7 +790,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
|
@ -826,7 +846,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else { // Load-Merge-Write
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err;
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
||||
|
||||
rowsWritten = rows3;
|
||||
|
@ -1183,52 +1203,13 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int comparColIdCompCol(const void *arg1, const void *arg2) {
|
||||
return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
|
||||
}
|
||||
|
||||
static int comparColIdDataCol(const void *arg1, const void *arg2) {
|
||||
return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
|
||||
}
|
||||
|
||||
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
|
||||
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
|
||||
if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
|
||||
if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
|
||||
SDataCols *pDataCols) {
|
||||
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
|
||||
|
||||
void *ptr = NULL;
|
||||
for (int i = 0; i < numOfColIds; i++) {
|
||||
int16_t colId = colIds[i];
|
||||
|
||||
ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol),
|
||||
comparColIdCompCol);
|
||||
if (ptr == NULL) continue;
|
||||
SCompCol *pCompCol = (SCompCol *)ptr;
|
||||
|
||||
ptr =
|
||||
bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
|
||||
ASSERT(ptr != NULL);
|
||||
SDataCol *pDataCol = (SDataCol *)ptr;
|
||||
|
||||
pDataCol->len = pCompCol->len;
|
||||
if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
|
||||
int maxPoints, char *buffer, int bufferSize) {
|
||||
// Verify by checksum
|
||||
if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;
|
||||
if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Decode the data
|
||||
if (comp) {
|
||||
|
@ -1249,10 +1230,97 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
||||
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
|
||||
SDataCol *pDataCol) {
|
||||
ASSERT(pDataCol->colId == pCompCol->colId);
|
||||
int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompCol->len);
|
||||
if (pHelper->pBuffer == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pHelper->compBuffer = trealloc(pHelper->compBuffer, tsize);
|
||||
if (pHelper->compBuffer == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (lseek(pFile->fd, pCompCol->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tread(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
|
||||
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm,
|
||||
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, pCompCol->colId,
|
||||
(int64_t)pCompCol->offset);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||
|
||||
SCompData *pCompData = (SCompData *)pHelper->pBuffer;
|
||||
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||
|
||||
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
||||
|
||||
int dcol = 0;
|
||||
int ccol = 0;
|
||||
for (int i = 0; i < numOfColIds; i++) {
|
||||
int16_t colId = colIds[i];
|
||||
SDataCol *pDataCol = NULL;
|
||||
SCompCol *pCompCol = NULL;
|
||||
|
||||
while (true) {
|
||||
ASSERT(dcol < pDataCols->numOfCols);
|
||||
pDataCol = &pDataCols->cols[dcol];
|
||||
ASSERT(pDataCol->colId <= colId);
|
||||
if (pDataCol->colId == colId) break;
|
||||
dcol++;
|
||||
}
|
||||
|
||||
ASSERT(pDataCol->colId == colId);
|
||||
|
||||
while (ccol < pCompBlock->numOfCols) {
|
||||
pCompCol = &pHelper->pCompData->cols[ccol];
|
||||
if (pCompCol->colId >= colId) break;
|
||||
ccol++;
|
||||
}
|
||||
|
||||
if (ccol >= pCompBlock->numOfCols || pCompCol->colId > colId) {
|
||||
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
|
||||
dcol++;
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(pCompCol->colId == pDataCol->colId);
|
||||
|
||||
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
|
||||
dcol++;
|
||||
ccol++;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||
|
||||
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||
|
||||
|
@ -1262,6 +1330,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
goto _err;
|
||||
}
|
||||
|
||||
SCompData *pCompData = (SCompData *)pHelper->pBuffer;
|
||||
|
||||
int fd = pFile->fd;
|
||||
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
||||
|
@ -1277,7 +1347,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
}
|
||||
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
|
||||
|
||||
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
|
||||
int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
|
||||
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
|
||||
tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
|
||||
pFile->fname, (int64_t)(pCompBlock->offset), pCompBlock->len);
|
||||
|
@ -1315,8 +1385,11 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
}
|
||||
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
||||
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
|
||||
pCompCol->colId, (int64_t)pCompCol->offset);
|
||||
goto _err;
|
||||
}
|
||||
dcol++;
|
||||
ccol++;
|
||||
} else if (pCompCol->colId < pDataCol->colId) {
|
||||
|
|
|
@ -599,7 +599,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
|||
|
||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
|
||||
|
||||
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
|
||||
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) {
|
||||
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||
|
||||
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
|
||||
|
|
Loading…
Reference in New Issue