move key column into SCompBlock
This commit is contained in:
parent
823761d94c
commit
e5d5782125
|
@ -206,10 +206,10 @@ typedef struct {
|
||||||
int64_t offset : 63;
|
int64_t offset : 63;
|
||||||
int32_t algorithm : 8;
|
int32_t algorithm : 8;
|
||||||
int32_t numOfRows : 24;
|
int32_t numOfRows : 24;
|
||||||
int32_t sversion;
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols
|
||||||
int16_t numOfSubBlocks;
|
int16_t numOfSubBlocks;
|
||||||
int16_t numOfCols;
|
int16_t numOfCols; // not including timestamp column
|
||||||
TSKEY keyFirst;
|
TSKEY keyFirst;
|
||||||
TSKEY keyLast;
|
TSKEY keyLast;
|
||||||
} SCompBlock;
|
} SCompBlock;
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
|
|
||||||
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
||||||
|
#define TSDB_KEY_COL_OFFSET 0
|
||||||
|
|
||||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
||||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
||||||
|
@ -646,7 +647,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
}
|
}
|
||||||
|
|
||||||
int nColsNotAllNull = 0;
|
int nColsNotAllNull = 0;
|
||||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
|
||||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||||
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
|
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
|
||||||
|
|
||||||
|
@ -658,7 +659,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
|
|
||||||
pCompCol->colId = pDataCol->colId;
|
pCompCol->colId = pDataCol->colId;
|
||||||
pCompCol->type = pDataCol->type;
|
pCompCol->type = pDataCol->type;
|
||||||
if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) {
|
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
|
||||||
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
|
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
|
||||||
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
|
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
|
||||||
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
|
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
|
||||||
|
@ -673,17 +674,17 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
int32_t toffset = 0;
|
int32_t toffset = 0;
|
||||||
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
|
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
|
||||||
int32_t lsize = tsize;
|
int32_t lsize = tsize;
|
||||||
|
int32_t keyLen = 0;
|
||||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
||||||
if (tcol >= nColsNotAllNull) break;
|
if (tcol >= nColsNotAllNull) break;
|
||||||
|
|
||||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||||
SCompCol *pCompCol = pCompData->cols + tcol;
|
SCompCol *pCompCol = pCompData->cols + tcol;
|
||||||
|
|
||||||
if (pDataCol->colId != pCompCol->colId) continue;
|
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
|
||||||
void *tptr = (void *)((char *)pCompData + lsize);
|
void *tptr = POINTER_SHIFT(pCompData, lsize);
|
||||||
|
|
||||||
pCompCol->offset = toffset;
|
|
||||||
|
|
||||||
|
int32_t flen = 0; // final length
|
||||||
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
|
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
|
||||||
|
|
||||||
if (pCfg->compression) {
|
if (pCfg->compression) {
|
||||||
|
@ -695,22 +696,29 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
||||||
tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
||||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
||||||
} else {
|
} else {
|
||||||
pCompCol->len = tlen;
|
flen = tlen;
|
||||||
memcpy(tptr, pDataCol->pData, pCompCol->len);
|
memcpy(tptr, pDataCol->pData, flen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add checksum
|
// Add checksum
|
||||||
ASSERT(pCompCol->len > 0);
|
ASSERT(flen > 0);
|
||||||
pCompCol->len += sizeof(TSCKSUM);
|
flen += sizeof(TSCKSUM);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);
|
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
|
||||||
|
|
||||||
toffset += pCompCol->len;
|
if (ncol != 0) {
|
||||||
lsize += pCompCol->len;
|
pCompCol->offset = toffset;
|
||||||
|
pCompCol->len = flen;
|
||||||
tcol++;
|
tcol++;
|
||||||
|
} else {
|
||||||
|
keyLen = flen;
|
||||||
|
}
|
||||||
|
|
||||||
|
toffset += flen;
|
||||||
|
lsize += flen;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCompData->delimiter = TSDB_FILE_DELIMITER;
|
pCompData->delimiter = TSDB_FILE_DELIMITER;
|
||||||
|
@ -732,8 +740,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
pCompBlock->offset = offset;
|
pCompBlock->offset = offset;
|
||||||
pCompBlock->algorithm = pCfg->compression;
|
pCompBlock->algorithm = pCfg->compression;
|
||||||
pCompBlock->numOfRows = rowsToWrite;
|
pCompBlock->numOfRows = rowsToWrite;
|
||||||
pCompBlock->sversion = pHelper->tableInfo.sversion;
|
pCompBlock->len = lsize;
|
||||||
pCompBlock->len = (int32_t)lsize;
|
pCompBlock->keyLen = keyLen;
|
||||||
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
|
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
|
||||||
pCompBlock->numOfCols = nColsNotAllNull;
|
pCompBlock->numOfCols = nColsNotAllNull;
|
||||||
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
||||||
|
@ -1276,10 +1284,13 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
|
||||||
|
|
||||||
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
|
ASSERT(colIds[0] == 0);
|
||||||
|
|
||||||
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||||
|
SCompCol compCol = {0};
|
||||||
|
|
||||||
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
// If only load timestamp column, no need to load SCompData part
|
||||||
|
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
||||||
|
|
||||||
int dcol = 0;
|
int dcol = 0;
|
||||||
int ccol = 0;
|
int ccol = 0;
|
||||||
|
@ -1298,6 +1309,13 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
|
|
||||||
ASSERT(pDataCol->colId == colId);
|
ASSERT(pDataCol->colId == colId);
|
||||||
|
|
||||||
|
if (colId == 0) { // load the key row
|
||||||
|
compCol.colId = colId;
|
||||||
|
compCol.len = pCompBlock->keyLen;
|
||||||
|
compCol.type = pDataCol->type;
|
||||||
|
compCol.offset = TSDB_KEY_COL_OFFSET;
|
||||||
|
pCompCol = &compCol;
|
||||||
|
} else { // load non-key rows
|
||||||
while (ccol < pCompBlock->numOfCols) {
|
while (ccol < pCompBlock->numOfCols) {
|
||||||
pCompCol = &pHelper->pCompData->cols[ccol];
|
pCompCol = &pHelper->pCompData->cols[ccol];
|
||||||
if (pCompCol->colId >= colId) break;
|
if (pCompCol->colId >= colId) break;
|
||||||
|
@ -1311,10 +1329,11 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pCompCol->colId == pDataCol->colId);
|
ASSERT(pCompCol->colId == pDataCol->colId);
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
|
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
|
||||||
dcol++;
|
dcol++;
|
||||||
ccol++;
|
if (colId != 0) ccol++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1362,8 +1381,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
pDataCols->numOfRows = pCompBlock->numOfRows;
|
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||||
|
|
||||||
// Recover the data
|
// Recover the data
|
||||||
int ccol = 0;
|
int ccol = 0; // loop iter for SCompCol object
|
||||||
int dcol = 0;
|
int dcol = 0; // loop iter for SDataCols object
|
||||||
while (dcol < pDataCols->numOfCols) {
|
while (dcol < pDataCols->numOfCols) {
|
||||||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||||
if (ccol >= pCompData->numOfCols) {
|
if (ccol >= pCompData->numOfCols) {
|
||||||
|
@ -1373,12 +1392,23 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
int16_t tcolId = 0;
|
||||||
|
int32_t toffset = TSDB_KEY_COL_OFFSET;
|
||||||
|
int32_t tlen = pCompBlock->keyLen;
|
||||||
|
|
||||||
if (pCompCol->colId == pDataCol->colId) {
|
if (dcol != 0) {
|
||||||
|
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
||||||
|
tcolId = pCompCol->colId;
|
||||||
|
toffset = pCompCol->offset;
|
||||||
|
tlen = pCompCol->len;
|
||||||
|
} else {
|
||||||
|
ASSERT(pDataCol->colId == tcolId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tcolId == pDataCol->colId) {
|
||||||
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
||||||
int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||||
if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
|
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
|
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
|
||||||
}
|
}
|
||||||
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
|
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
|
||||||
|
@ -1387,16 +1417,16 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + toffset, tlen, pCompBlock->algorithm,
|
||||||
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
|
pCompBlock->numOfRows, pDataCols->maxPoints, pHelper->compBuffer,
|
||||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
tsizeof(pHelper->compBuffer)) < 0) {
|
||||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
|
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
||||||
pCompCol->colId, (int64_t)pCompCol->offset);
|
REPO_ID(pHelper->pRepo), pFile->fname, tcolId, (int64_t)pCompBlock->offset, toffset);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if (dcol != 0) ccol++;
|
||||||
dcol++;
|
dcol++;
|
||||||
ccol++;
|
} else if (tcolId < pDataCol->colId) {
|
||||||
} else if (pCompCol->colId < pDataCol->colId) {
|
|
||||||
ccol++;
|
ccol++;
|
||||||
} else {
|
} else {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
|
|
Loading…
Reference in New Issue