more code

This commit is contained in:
Hongze Cheng 2022-07-01 15:10:46 +00:00
parent 36fbf6d7cb
commit 7d50bfcb0d
6 changed files with 658 additions and 575 deletions

View File

@ -61,15 +61,17 @@ typedef struct {
uint64_t suid;
} STableListInfo;
#pragma pack(push, 1)
typedef struct SColumnDataAgg {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t maxIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SColumnDataAgg;
#pragma pack(pop)
typedef struct SDataBlockInfo {
STimeWindow window;
@ -114,7 +116,7 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow* twindows;
int64_t startVersion;

View File

@ -119,10 +119,7 @@ int32_t tPutBlockCol(uint8_t *p, void *ph);
int32_t tGetBlockCol(uint8_t *p, void *ph);
int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SBlock
#define tBlockInit() ((SBlock){0})
void tBlockReset(SBlock *pBlock);
void tBlockClear(SBlock *pBlock);
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest);
int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
@ -134,11 +131,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph);
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
// SColdata
#define tColDataInit() ((SColData){0})
void tColDataReset(SColData *pColData, int16_t cid, int8_t type);
void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
void tColDataClear(void *ph);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
// SBlockData
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
@ -166,9 +163,8 @@ void tsdbFree(uint8_t *pBuf);
#define tMapDataInit() ((SMapData){0})
void tMapDataReset(SMapData *pMapData);
void tMapDataClear(SMapData *pMapData);
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest);
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
@ -223,7 +219,6 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
// SDataFReader
@ -373,31 +368,32 @@ struct SBlockIdx {
struct SMapData {
int32_t nItem;
uint8_t flag;
uint8_t *pOfst;
uint32_t nData;
int32_t *aOffset;
int32_t nData;
uint8_t *pData;
uint8_t *pBuf;
};
typedef struct {
int16_t cid;
int8_t type;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int64_t offset;
int64_t bsize; // bitmap size
int64_t csize; // compressed column value size
int64_t osize; // original column value size (only save for variant data type)
int32_t offset;
int32_t szBitmap; // bitmap size
int32_t szOffset; // size of offset, only for variant-length data type
int32_t szValue; // compressed column value size
int32_t szOrigin; // original column value size (only save for variant data type)
} SBlockCol;
typedef struct {
int64_t nRow;
int8_t cmprAlg;
int64_t offset;
int64_t szVersion; // VERSION size
int64_t szTSKEY; // TSKEY size
int64_t szBlock; // total block size
SMapData mBlockCol; // SMapData<SBlockCol>
int32_t nRow;
int8_t cmprAlg;
int64_t offset; // block data offset
int32_t szBlockCol; // SBlockCol size
int32_t szVersion; // VERSION size
int32_t szTSKEY; // TSKEY size
int32_t szBlock; // total block size
int64_t sOffset; // sma offset
int32_t nSma; // sma size
} SSubBlock;
struct SBlock {
@ -425,7 +421,7 @@ struct SAggrBlkCol {
struct SColData {
int16_t cid;
int8_t type;
int8_t offsetValid;
int8_t smaOn;
int32_t nVal;
uint8_t flag;
uint8_t *pBitMap;

View File

@ -33,12 +33,10 @@ typedef struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SBlock oBlock;
SBlockData oBlockData;
SDataFWriter *pWriter;
SArray *aBlockIdxN; // SArray<SBlockIdx>
SMapData nBlockMap; // SMapData<SBlock>
SBlock nBlock;
SBlockData nBlockData;
int64_t suid;
int64_t uid;
@ -260,7 +258,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// old
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockReset(&pCommitter->oBlock);
tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
if (pRSet) {
@ -274,7 +271,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// new
taosArrayClear(pCommitter->aBlockIdxN);
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->nBlock);
tBlockDataReset(&pCommitter->nBlockData);
if (pRSet) {
wSet = (SDFileSet){.diskId = pRSet->diskId,
@ -351,11 +347,6 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
#if 0
code = tsdbWriteBlockSMA(pCommitter, pBlockData, pBlock);
if (code) goto _err;
#endif
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err;
@ -371,7 +362,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlock *pBlock = &pCommitter->nBlock;
SBlock block;
SBlock *pBlock = &block;
TSDBROW *pRow1;
TSDBROW row2;
TSDBROW *pRow2 = &row2;
@ -469,7 +461,8 @@ _err:
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0;
TSDBROW *pRow;
SBlock *pBlock = &pCommitter->nBlock;
SBlock block;
SBlock *pBlock = &block;
SBlockData *pBlockData = &pCommitter->nBlockData;
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
@ -519,13 +512,14 @@ _err:
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SBlock block;
if (pBlock->last) {
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
if (code) goto _err;
tBlockReset(&pCommitter->nBlock);
code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &pCommitter->nBlock, pBlockIdx, 0);
tBlockReset(&block);
code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
if (code) goto _err;
} else {
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
@ -590,6 +584,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlock block;
TSDBROW *pRow;
tBlockDataReset(pBlockData);
@ -617,11 +612,8 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
}
}
// write as a subblock
code = tBlockCopy(pBlock, &pCommitter->nBlock);
if (code) goto _err;
code = tsdbCommitBlockData(pCommitter, pBlockData, &pCommitter->nBlock, pBlockIdx, 0);
block = *pBlock;
code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
if (code) goto _err;
return code;
@ -670,7 +662,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
SBlock *pBlock = &pCommitter->oBlock;
SBlock block;
SBlock *pBlock = &block;
iBlock = 0;
if (iBlock < nBlock) {
@ -895,6 +888,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
_err:
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbDataFReaderClose(&pCommitter->pReader);
tsdbDataFWriterClose(&pCommitter->pWriter, 0);
return code;
}
@ -931,35 +926,35 @@ _err:
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
pCommitter->pReader = NULL;
pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
pCommitter->oBlockMap = tMapDataInit();
pCommitter->oBlock = tBlockInit();
pCommitter->pWriter = NULL;
pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
pCommitter->nBlockMap = tMapDataInit();
pCommitter->nBlock = tBlockInit();
code = tBlockDataInit(&pCommitter->oBlockData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->nBlockData);
if (code) {
tBlockDataClear(&pCommitter->oBlockData);
if (pCommitter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->aBlockIdxN == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataInit(&pCommitter->oBlockData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->nBlockData);
if (code) goto _exit;
_exit:
return code;
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx);
// tMapDataClear(&pCommitter->oBlockMap);
// tBlockClear(&pCommitter->oBlock);
// tBlockDataClear(&pCommitter->oBlockData);
tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData);
taosArrayDestroy(pCommitter->aBlockIdxN);
// tMapDataClear(&pCommitter->nBlockMap);
// tBlockClear(&pCommitter->nBlock);
// tBlockDataClear(&pCommitter->nBlockData);
tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData);
}
static int32_t tsdbCommitData(SCommitter *pCommitter) {

View File

@ -807,10 +807,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
for (int32_t j = 0; j < mapData.nItem; ++j) {
SBlock block = {0};
int32_t code = tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
// 1. time range check
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {

View File

@ -489,6 +489,7 @@ _err:
int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
int32_t code = 0;
if (*ppReader == NULL) goto _exit;
if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
@ -511,6 +512,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
}
taosMemoryFree(*ppReader);
_exit:
*ppReader = NULL;
return code;
@ -586,11 +589,14 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
uint8_t *pBuf = NULL;
int64_t n;
int64_t tn;
SBlockDataHdr hdr;
if (!ppBuf) ppBuf = &pBuf;
// alloc
if (!ppBuf) ppBuf = &mBlock->pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
@ -623,17 +629,24 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
ASSERT(hdr.uid == pBlockIdx->uid);
n = sizeof(hdr);
n += tGetMapData(*ppBuf + n, mBlock);
tn = tGetMapData(*ppBuf + n, mBlock);
if (tn < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
n += tn;
ASSERT(n + sizeof(TSCKSUM) == size);
tsdbFree(pBuf);
return code;
_err:
tsdbError("vgId:%d read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf);
return code;
}
static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
int64_t n;
@ -656,7 +669,6 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion);
// TSKEY
pBuf = pBuf + pSubBlock->szVersion;
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
@ -674,9 +686,9 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
}
// TSKEY
pBuf = pBuf + pSubBlock->szVersion;
n = tsDecompressTimestamp(pBuf, pSubBlock->szTSKEY, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
n = tsDecompressTimestamp(pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY, pSubBlock->nRow,
(char *)pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf,
size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
@ -689,15 +701,13 @@ _err:
return code;
}
static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock, SBlockCol *pBlockCol,
SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) {
static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf,
uint8_t **ppBuf) {
int32_t code = 0;
int64_t size;
int64_t n;
ASSERT(pBlockCol->flag != HAS_NULL);
if (!taosCheckChecksumWhole(pBuf, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
if (!taosCheckChecksumWhole(pBuf, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
@ -705,40 +715,71 @@ static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock,
pColData->nVal = pSubBlock->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
// BITMAP
if (pBlockCol->flag != HAS_VALUE) {
size = BIT2_SIZE(pSubBlock->nRow);
ASSERT(pBlockCol->szBitmap);
size = BIT2_SIZE(pColData->nVal);
code = tsdbRealloc(&pColData->pBitMap, size);
if (code) goto _err;
ASSERT(pBlockCol->bsize == size);
code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _err;
memcpy(pColData->pBitMap, pBuf, size);
} else {
ASSERT(pBlockCol->bsize == 0);
}
pBuf = pBuf + pBlockCol->bsize;
n = tsDecompressTinyint(pBuf, pBlockCol->szBitmap, size, pColData->pBitMap, size, TWO_STAGE_COMP, *ppBuf,
size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// value
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
pColData->nData = pBlockCol->osize;
ASSERT(n == size);
} else {
pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow;
ASSERT(pBlockCol->szBitmap == 0);
}
pBuf = pBuf + pBlockCol->szBitmap;
// OFFSET
if (IS_VAR_DATA_TYPE(pColData->type)) {
ASSERT(pBlockCol->szOffset);
size = sizeof(int32_t) * pColData->nVal;
code = tsdbRealloc((uint8_t **)&pColData->aOffset, size);
if (code) goto _err;
code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _err;
n = tsDecompressInt(pBuf, pBlockCol->szOffset, pColData->nVal, (char *)pColData->aOffset, size, TWO_STAGE_COMP,
*ppBuf, size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == size);
} else {
ASSERT(pBlockCol->szOffset == 0);
}
pBuf = pBuf + pBlockCol->szOffset;
// VALUE
pColData->nData = pBlockCol->szOrigin;
code = tsdbRealloc(&pColData->pData, pColData->nData);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
memcpy(pColData->pData, pBuf, pColData->nData);
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf, size);
code = tsdbRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES);
if (code) goto _err;
}
n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->csize, pSubBlock->nRow, pColData->pData,
pColData->nData, pSubBlock->cmprAlg, *ppBuf, size);
n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->szValue, pSubBlock->nRow, pColData->pData,
pColData->nData, pSubBlock->cmprAlg, *ppBuf,
pColData->nData + COMP_OVERFLOW_BYTES);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
@ -753,11 +794,41 @@ _err:
return code;
}
static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlockCol) {
int32_t code = 0;
int32_t n = 0;
SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol;
if (!taosCheckChecksumWhole(p, pSubBlock->szBlockCol + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
n += sizeof(SBlockDataHdr);
while (n < pSubBlock->szBlockCol) {
n += tGetBlockCol(p + n, pBlockCol);
if (taosArrayPush(aBlockCol, pBlockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n == pSubBlock->szBlockCol);
return code;
_err:
return code;
}
static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL;
int32_t code = 0;
int64_t offset;
int64_t size;
@ -766,9 +837,15 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
tBlockDataReset(pBlockData);
pBlockData->nRow = pSubBlock->nRow;
// TSDBKEY
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
// TSDBKEY and SBlockCol
if (nCol == 1) {
offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM);
size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
} else {
offset = pSubBlock->offset;
size = pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
}
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
@ -787,31 +864,47 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
goto _err;
}
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err;
if (nCol == 1) {
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err;
goto _exit;
} else {
aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol);
if (code) goto _err;
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2);
if (code) goto _err;
}
// OTHER
SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol;
SColData *pColData;
for (int32_t iCol = 1; iCol < nCol; iCol++) {
int16_t cid = aColId[iCol];
void *p = taosArraySearch(aBlockCol, &(SBlockCol){.cid = aColId[iCol]}, tBlockColCmprFn, TD_EQ);
if (p) {
SBlockCol *pBlockCol = (SBlockCol *)p;
SColData *pColData;
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
0) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->szVersion + pSubBlock->szTSKEY +
sizeof(TSCKSUM) + pBlockCol->offset;
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion +
pSubBlock->szTSKEY + sizeof(TSCKSUM) + pBlockCol->offset;
size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
@ -833,14 +926,18 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
goto _err;
}
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
if (code) goto _err;
}
}
}
_exit:
taosArrayDestroy(aBlockCol);
return code;
_err:
taosArrayDestroy(aBlockCol);
return code;
}
@ -855,7 +952,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2);
code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2);
if (code) goto _err;
if (pBlock->nSubBlock > 1) {
@ -863,7 +960,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
SBlockData *pBlockData2 = &(SBlockData){0};
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
if (code) goto _err;
code = tBlockDataCopy(pBlockData, pBlockData2);
@ -904,7 +1001,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
int64_t n;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SBlockCol *pBlockCol = &(SBlockCol){0};
SArray *aBlockCol = NULL;
tBlockDataReset(pBlockData);
@ -929,41 +1026,52 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
goto _err;
}
// recover
pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(SBlockDataHdr);
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
// TSDBKEY
p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM);
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
if (code) goto _err;
p = p + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
SColData *pColData;
// COLUMNS
aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol);
if (code) goto _err;
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
SColData *pColData;
SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol);
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, p, ppBuf2);
p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY +
sizeof(TSCKSUM) + pBlockCol->offset;
code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, p, ppBuf2);
if (code) goto _err;
p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
}
}
taosArrayDestroy(aBlockCol);
return code;
_err:
tsdbError("vgId:%d tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
taosArrayDestroy(aBlockCol);
return code;
}
@ -1189,6 +1297,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
int32_t code = 0;
STsdb *pTsdb = (*ppWriter)->pTsdb;
if (*ppWriter == NULL) goto _exit;
if (sync) {
if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
@ -1232,6 +1342,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
}
taosMemoryFree(*ppWriter);
_exit:
*ppWriter = NULL;
return code;
@ -1366,43 +1477,308 @@ _err:
return code;
}
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) {
int32_t code = 0;
SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++];
SBlockCol *pBlockCol = &(SBlockCol){0};
int64_t size;
int64_t n;
TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD;
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
TSCKSUM cksm;
uint8_t *p;
int64_t offset;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
TSKEY lastKey = TSKEY_MIN;
static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) {
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = TSDBROW_KEY(&tsdbRowFromBlockData(pBlockData, iRow));
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
if (iRow == 0) {
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, key);
if (tsdbKeyCmprFn(&pBlock->minKey, &key) > 0) {
pBlock->minKey = key;
}
} else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
pBlock->hasDup = 1;
}
}
if (iRow == pBlockData->nRow - 1) {
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, key);
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pBlock->maxKey, &key) < 0) {
pBlock->maxKey = key;
}
pBlock->minVersion = TMIN(pBlock->minVersion, key.version);
pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version);
if (key.ts == lastKey) {
pBlock->hasDup = 1;
}
lastKey = key.ts;
}
pBlock->nRow += pBlockData->nRow;
}
static int32_t tsdbWriteBlockDataKey(SSubBlock *pSubBlock, SBlockData *pBlockData, uint8_t **ppBuf1, int64_t *nDataP,
uint8_t **ppBuf2) {
int32_t code = 0;
int64_t size;
int64_t tsize;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
pSubBlock->szVersion = sizeof(int64_t) * pSubBlock->nRow;
pSubBlock->szTSKEY = sizeof(TSKEY) * pSubBlock->nRow;
code = tsdbRealloc(ppBuf1, *nDataP + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
if (code) goto _err;
// VERSION
memcpy(*ppBuf1 + *nDataP, pBlockData->aVersion, pSubBlock->szVersion);
// TSKEY
memcpy(*ppBuf1 + *nDataP + pSubBlock->szVersion, pBlockData->aTSKEY, pSubBlock->szTSKEY);
} else {
size = (sizeof(int64_t) + sizeof(TSKEY)) * pSubBlock->nRow + COMP_OVERFLOW_BYTES * 2;
code = tsdbRealloc(ppBuf1, *nDataP + size + sizeof(TSCKSUM));
if (code) goto _err;
tsize = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, tsize);
if (code) goto _err;
}
// VERSION
pSubBlock->szVersion =
tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + *nDataP, size, pSubBlock->cmprAlg, *ppBuf2, tsize);
if (pSubBlock->szVersion <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// TSKEY
pSubBlock->szTSKEY = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow,
pBlockData->nRow, *ppBuf1 + *nDataP + pSubBlock->szVersion,
size - pSubBlock->szVersion, pSubBlock->cmprAlg, *ppBuf2, tsize);
if (pSubBlock->szTSKEY <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(pSubBlock->szVersion + pSubBlock->szTSKEY <= size);
}
// checksum
size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, size);
*nDataP += size;
return code;
_err:
return code;
}
static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBlock *pSubBlock, uint8_t **ppBuf1,
int64_t *nDataP, uint8_t **ppBuf2) {
int32_t code = 0;
int64_t size;
int64_t n = 0;
// BITMAP
if (pColData->flag != HAS_VALUE) {
size = BIT2_SIZE(pColData->nVal) + COMP_OVERFLOW_BYTES;
code = tsdbRealloc(ppBuf1, *nDataP + n + size);
if (code) goto _err;
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
pBlockCol->szBitmap =
tsCompressTinyint((char *)pColData->pBitMap, BIT2_SIZE(pColData->nVal), BIT2_SIZE(pColData->nVal),
*ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size);
if (pBlockCol->szBitmap <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
} else {
pBlockCol->szBitmap = 0;
}
n += pBlockCol->szBitmap;
// OFFSET
if (IS_VAR_DATA_TYPE(pColData->type)) {
size = sizeof(int32_t) * pColData->nVal + COMP_OVERFLOW_BYTES;
code = tsdbRealloc(ppBuf1, *nDataP + n + size);
if (code) goto _err;
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
pBlockCol->szOffset = tsCompressInt((char *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, pColData->nVal,
*ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size);
if (pBlockCol->szOffset <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
} else {
pBlockCol->szOffset = 0;
}
n += pBlockCol->szOffset;
// VALUE
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
pBlockCol->szValue = pColData->nData;
code = tsdbRealloc(ppBuf1, *nDataP + n + pBlockCol->szValue + sizeof(TSCKSUM));
if (code) goto _err;
memcpy(*ppBuf1 + *nDataP + n, pColData->pData, pBlockCol->szValue);
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES;
code = tsdbRealloc(ppBuf1, *nDataP + n + size + sizeof(TSCKSUM));
if (code) goto _err;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
pBlockCol->szValue =
tDataTypes[pColData->type].compFunc((char *)pColData->pData, pColData->nData, pColData->nVal,
*ppBuf1 + *nDataP + n, size, pSubBlock->cmprAlg, *ppBuf2, size);
if (pBlockCol->szValue <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
}
n += pBlockCol->szValue;
pBlockCol->szOrigin = pColData->nData;
// checksum
n += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, n);
*nDataP += n;
return code;
_err:
return code;
}
static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SSubBlock *pSubBlock, SBlockDataHdr hdr, SArray *aBlockCol,
uint8_t *pData, int64_t nData, uint8_t **ppBuf) {
int32_t code = 0;
int32_t nBlockCol = taosArrayGetSize(aBlockCol);
int64_t size;
int64_t n;
// HDR + SArray<SBlockCol>
pSubBlock->szBlockCol = sizeof(hdr);
for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) {
pSubBlock->szBlockCol += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol));
}
code = tsdbRealloc(ppBuf, pSubBlock->szBlock + sizeof(TSCKSUM));
if (code) goto _err;
n = 0;
memcpy(*ppBuf, &hdr, sizeof(hdr));
n += sizeof(hdr);
for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) {
n += tPutBlockCol(*ppBuf + n, taosArrayGet(aBlockCol, iBlockCol));
}
taosCalcChecksumAppend(0, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM));
ASSERT(n == pSubBlock->szBlockCol);
n = taosWriteFile(pFD, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// SBlockData
n = taosWriteFile(pFD, pData, nData);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
return code;
_err:
return code;
}
static void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
SColVal *pColVal = &colVal;
*pColAgg = (SColumnDataAgg){.colId = pColData->cid};
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
tColDataGetValue(pColData, iVal, pColVal);
if (pColVal->isNone || pColVal->isNull) {
pColAgg->numOfNull++;
} else {
// TODO:
ASSERT(0);
}
}
}
static int32_t tsdbWriteBlockSMA(TdFilePtr pFD, SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t **ppBuf) {
int32_t code = 0;
int64_t n;
SColData *pColData;
// prepare
pSubBlock->nSma = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) {
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData);
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
pSubBlock->nSma++;
}
if (pSubBlock->nSma == 0) goto _exit;
// calc
code = tsdbRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
if (code) goto _err;
n = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) {
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData);
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
tsdbCalcColDataSMA(pColData, &((SColumnDataAgg *)(*ppBuf))[n]);
n++;
}
taosCalcChecksumAppend(0, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
// write
n = taosWriteFile(pFD, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
_exit:
return code;
_err:
return code;
}
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) {
int32_t code = 0;
SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++];
SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol;
int64_t n;
TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD;
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
uint8_t *p;
int64_t nData;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
SArray *aBlockCol = NULL;
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
tsdbUpdateBlockInfo(pBlockData, pBlock);
pSubBlock->nRow = pBlockData->nRow;
pSubBlock->cmprAlg = cmprAlg;
@ -1411,93 +1787,20 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
} else {
pSubBlock->offset = pWriter->wSet.fData.size;
}
pSubBlock->szBlock = 0;
// HDR
n = taosWriteFile(pFileFD, &hdr, sizeof(hdr));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
// ======================= BLOCK DATA =======================
// TSDBKEY
nData = 0;
code = tsdbWriteBlockDataKey(pSubBlock, pBlockData, ppBuf1, &nData, ppBuf2);
if (code) goto _err;
// COLUMNS
aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aColDataP), sizeof(SBlockCol));
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pSubBlock->szBlock += n;
// TSDBKEY
if (cmprAlg == NO_COMPRESSION) {
cksm = 0;
// version
pSubBlock->szVersion = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->szVersion);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->szVersion);
// TSKEY
pSubBlock->szTSKEY = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->szTSKEY);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->szTSKEY);
// cksm
size = sizeof(cksm);
n = taosWriteFile(pFileFD, (uint8_t *)&cksm, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP);
size = (sizeof(int64_t) + sizeof(TSKEY)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// version
n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, *ppBuf1,
size, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->szVersion = n;
// TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->szVersion, size - pSubBlock->szVersion, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->szTSKEY = n;
// cksm
n = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
ASSERT(n <= size);
taosCalcChecksumAppend(0, *ppBuf1, n);
// write
n = taosWriteFile(pFileFD, *ppBuf1, n);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
pSubBlock->szBlock += (pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
// other columns
offset = 0;
tMapDataReset(&pSubBlock->mBlockCol);
int32_t offset = 0;
for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol);
@ -1510,102 +1813,54 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pBlockCol->flag = pColData->flag;
if (pColData->flag != HAS_NULL) {
cksm = 0;
code = tsdbWriteColData(pColData, pBlockCol, pSubBlock, ppBuf1, &nData, ppBuf2);
if (code) goto _err;
pBlockCol->offset = offset;
// bitmap
if (pColData->flag == HAS_VALUE) {
pBlockCol->bsize = 0;
} else {
pBlockCol->bsize = BIT2_SIZE(pBlockData->nRow);
n = taosWriteFile(pFileFD, pColData->pBitMap, pBlockCol->bsize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, pColData->pBitMap, n);
}
// data
if (cmprAlg == NO_COMPRESSION) {
// data
n = taosWriteFile(pFileFD, pColData->pData, pColData->nData);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pBlockCol->csize = n;
pBlockCol->osize = n;
// checksum
cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData);
n = taosWriteFile(pFileFD, &cksm, sizeof(cksm));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// data
n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size,
cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pBlockCol->csize = n;
pBlockCol->osize = pColData->nData;
// cksm
n += sizeof(TSCKSUM);
ASSERT(n <= size);
taosCalcChecksumAppend(cksm, *ppBuf1, n);
// write
n = taosWriteFile(pFileFD, *ppBuf1, n);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
// state
offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
pSubBlock->szBlock = pSubBlock->szBlock + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
offset = offset + pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
}
code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol);
if (code) goto _err;
if (taosArrayPush(aBlockCol, pBlockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
// write
code = tsdbWriteBlockDataImpl(pFileFD, pSubBlock, hdr, aBlockCol, *ppBuf1, nData, ppBuf2);
if (code) goto _err;
pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nData;
if (pBlock->last) {
pWriter->wSet.fLast.size += pSubBlock->szBlock;
} else {
pWriter->wSet.fData.size += pSubBlock->szBlock;
}
// ======================= BLOCK SMA =======================
pSubBlock->sOffset = 0;
pSubBlock->nSma = 0;
if (pBlock->nSubBlock > 1 || pBlock->last || pBlock->hasDup) goto _exit;
code = tsdbWriteBlockSMA(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1);
if (code) goto _err;
if (pSubBlock->nSma > 0) {
pSubBlock->sOffset = pWriter->wSet.fSma.size;
pWriter->wSet.fSma.size += (sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
}
_exit:
tsdbFree(pBuf1);
tsdbFree(pBuf2);
taosArrayDestroy(aBlockCol);
return code;
_err:
tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf1);
tsdbFree(pBuf2);
return code;
}
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize) {
int32_t code = 0;
// TODO
taosArrayDestroy(aBlockCol);
return code;
}

View File

@ -15,57 +15,15 @@
#include "tsdb.h"
#define TSDB_OFFSET_I32 ((uint8_t)0)
#define TSDB_OFFSET_I16 ((uint8_t)1)
#define TSDB_OFFSET_I8 ((uint8_t)2)
// SMapData =======================================================================
void tMapDataReset(SMapData *pMapData) {
pMapData->flag = TSDB_OFFSET_I32;
pMapData->nItem = 0;
pMapData->nData = 0;
}
void tMapDataClear(SMapData *pMapData) {
if (pMapData->pBuf) {
tsdbFree(pMapData->pBuf);
} else {
tsdbFree(pMapData->pOfst);
tsdbFree(pMapData->pData);
}
}
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) {
int32_t code = 0;
int32_t size;
pMapDataDest->nItem = pMapDataSrc->nItem;
pMapDataDest->flag = pMapDataSrc->flag;
switch (pMapDataDest->flag) {
case TSDB_OFFSET_I32:
size = sizeof(int32_t) * pMapDataDest->nItem;
break;
case TSDB_OFFSET_I16:
size = sizeof(int16_t) * pMapDataDest->nItem;
break;
case TSDB_OFFSET_I8:
size = sizeof(int8_t) * pMapDataDest->nItem;
break;
default:
ASSERT(0);
}
code = tsdbRealloc(&pMapDataDest->pOfst, size);
if (code) goto _exit;
memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size);
pMapDataDest->nData = pMapDataSrc->nData;
code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData);
if (code) goto _exit;
memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData);
_exit:
return code;
tsdbFree((uint8_t *)pMapData->aOffset);
tsdbFree(pMapData->pData);
}
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
@ -77,35 +35,19 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
pMapData->nData += tPutItemFn(NULL, pItem);
// alloc
code = tsdbRealloc(&pMapData->pOfst, sizeof(int32_t) * pMapData->nItem);
code = tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
if (code) goto _err;
code = tsdbRealloc(&pMapData->pData, pMapData->nData);
if (code) goto _err;
// put
ASSERT(pMapData->flag == TSDB_OFFSET_I32);
((int32_t *)pMapData->pOfst)[nItem] = offset;
pMapData->aOffset[nItem] = offset;
tPutItemFn(pMapData->pData + offset, pItem);
_err:
return code;
}
static FORCE_INLINE int32_t tMapDataGetOffset(SMapData *pMapData, int32_t idx) {
switch (pMapData->flag) {
case TSDB_OFFSET_I8:
return ((int8_t *)pMapData->pOfst)[idx];
break;
case TSDB_OFFSET_I16:
return ((int16_t *)pMapData->pOfst)[idx];
break;
case TSDB_OFFSET_I32:
return ((int32_t *)pMapData->pOfst)[idx];
break;
default:
ASSERT(0);
}
}
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
int32_t code = 0;
@ -135,58 +77,25 @@ _exit:
return code;
}
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
int32_t code = 0;
if (idx < 0 || idx >= pMapData->nItem) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
tGetItemFn(pMapData->pData + tMapDataGetOffset(pMapData, idx), pItem);
_exit:
return code;
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
ASSERT(idx >= 0 && idx < pMapData->nItem);
tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
}
int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
int32_t n = 0;
ASSERT(pMapData->flag == TSDB_OFFSET_I32);
n += tPutI32v(p ? p + n : p, pMapData->nItem);
if (pMapData->nItem) {
int32_t maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1);
if (maxOffset <= INT8_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8);
if (p) {
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI8(p + n, (int8_t)tMapDataGetOffset(pMapData, iItem));
}
} else {
n = n + sizeof(int8_t) * pMapData->nItem;
}
} else if (maxOffset <= INT16_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16);
if (p) {
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI16(p + n, (int16_t)tMapDataGetOffset(pMapData, iItem));
}
} else {
n = n + sizeof(int16_t) * pMapData->nItem;
}
} else {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32);
if (p) {
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI32(p + n, tMapDataGetOffset(pMapData, iItem));
}
} else {
n = n + sizeof(int32_t) * pMapData->nItem;
}
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem]);
}
n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData);
n += tPutI32v(p ? p + n : p, pMapData->nData);
if (p) {
memcpy(p + n, pMapData->pData, pMapData->nData);
}
n += pMapData->nData;
}
return n;
@ -194,26 +103,22 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
int32_t n = 0;
int32_t offset;
tMapDataReset(pMapData);
n += tGetI32v(p + n, &pMapData->nItem);
if (pMapData->nItem) {
n += tGetU8(p + n, &pMapData->flag);
pMapData->pOfst = p + n;
switch (pMapData->flag) {
case TSDB_OFFSET_I8:
n = n + sizeof(int8_t) * pMapData->nItem;
break;
case TSDB_OFFSET_I16:
n = n + sizeof(int16_t) * pMapData->nItem;
break;
case TSDB_OFFSET_I32:
n = n + sizeof(int32_t) * pMapData->nItem;
break;
if (tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
default:
ASSERT(0);
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
}
n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData);
n += tGetI32v(p + n, &pMapData->nData);
if (tsdbRealloc(&pMapData->pData, pMapData->nData)) return -1;
memcpy(pMapData->pData, p + n, pMapData->nData);
n += pMapData->nData;
}
return n;
@ -377,55 +282,8 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
// SBlock ======================================================
void tBlockReset(SBlock *pBlock) {
pBlock->minKey = TSDBKEY_MAX;
pBlock->maxKey = TSDBKEY_MIN;
pBlock->minVersion = VERSION_MAX;
pBlock->maxVersion = VERSION_MIN;
pBlock->nRow = 0;
pBlock->last = -1;
pBlock->hasDup = 0;
for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) {
pBlock->aSubBlock[iSubBlock].nRow = 0;
pBlock->aSubBlock[iSubBlock].cmprAlg = -1;
pBlock->aSubBlock[iSubBlock].offset = -1;
pBlock->aSubBlock[iSubBlock].szVersion = -1;
pBlock->aSubBlock[iSubBlock].szTSKEY = -1;
pBlock->aSubBlock[iSubBlock].szBlock = -1;
tMapDataReset(&pBlock->aSubBlock->mBlockCol);
}
pBlock->nSubBlock = 0;
}
void tBlockClear(SBlock *pBlock) {
for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) {
tMapDataClear(&pBlock->aSubBlock->mBlockCol);
}
}
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) {
int32_t code = 0;
pBlockDest->minKey = pBlockSrc->minKey;
pBlockDest->maxKey = pBlockSrc->maxKey;
pBlockDest->minVersion = pBlockSrc->minVersion;
pBlockDest->maxVersion = pBlockSrc->maxVersion;
pBlockDest->nRow = pBlockSrc->nRow;
pBlockDest->last = pBlockSrc->last;
pBlockDest->hasDup = pBlockSrc->hasDup;
pBlockDest->nSubBlock = pBlockSrc->nSubBlock;
for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) {
pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow;
pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg;
pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset;
pBlockDest->aSubBlock[iSubBlock].szVersion = pBlockSrc->aSubBlock[iSubBlock].szVersion;
pBlockDest->aSubBlock[iSubBlock].szTSKEY = pBlockSrc->aSubBlock[iSubBlock].szTSKEY;
pBlockDest->aSubBlock[iSubBlock].szBlock = pBlockSrc->aSubBlock[iSubBlock].szBlock;
code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol);
if (code) goto _exit;
}
_exit:
return code;
*pBlock =
(SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVersion = VERSION_MAX, .maxVersion = VERSION_MIN};
}
int32_t tPutBlock(uint8_t *p, void *ph) {
@ -441,13 +299,15 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlockCol);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].sOffset);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nSma);
}
return n;
@ -466,20 +326,21 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlockCol);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].sOffset);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nSma);
}
return n;
}
int32_t tBlockCmprFn(const void *p1, const void *p2) {
int32_t c;
SBlock *pBlock1 = (SBlock *)p1;
SBlock *pBlock2 = (SBlock *)p2;
@ -504,14 +365,11 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
n += tPutI8(p ? p + n : p, pBlockCol->flag);
if (pBlockCol->flag != HAS_NULL) {
n += tPutI64v(p ? p + n : p, pBlockCol->offset);
if (pBlockCol->flag != HAS_VALUE) {
n += tPutI64v(p ? p + n : p, pBlockCol->bsize);
}
n += tPutI64v(p ? p + n : p, pBlockCol->csize);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tPutI64v(p ? p + n : p, pBlockCol->osize);
}
n += tPutI32v(p ? p + n : p, pBlockCol->offset);
n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap);
n += tPutI32v(p ? p + n : p, pBlockCol->szOffset);
n += tPutI32v(p ? p + n : p, pBlockCol->szValue);
n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin);
}
return n;
@ -528,18 +386,11 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
if (pBlockCol->flag != HAS_NULL) {
n += tGetI64v(p + n, &pBlockCol->offset);
if (pBlockCol->flag != HAS_VALUE) {
n += tGetI64v(p + n, &pBlockCol->bsize);
} else {
pBlockCol->bsize = 0;
}
n += tGetI64v(p + n, &pBlockCol->csize);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tGetI64v(p + n, &pBlockCol->osize);
} else {
pBlockCol->osize = -1;
}
n += tGetI32v(p + n, &pBlockCol->offset);
n += tGetI32v(p + n, &pBlockCol->szBitmap);
n += tGetI32v(p + n, &pBlockCol->szOffset);
n += tGetI32v(p + n, &pBlockCol->szValue);
n += tGetI32v(p + n, &pBlockCol->szOrigin);
}
return n;
@ -942,12 +793,12 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
}
// SColData ========================================
void tColDataReset(SColData *pColData, int16_t cid, int8_t type) {
void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
pColData->cid = cid;
pColData->type = type;
pColData->smaOn = smaOn;
pColData->nVal = 0;
pColData->flag = 0;
pColData->offsetValid = 0;
pColData->nData = 0;
}
@ -977,26 +828,35 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
if (pColVal->isNone) {
pColData->flag |= HAS_NONE;
SET_BIT2(pColData->pBitMap, pColData->nVal, 0);
if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL;
} else if (pColVal->isNull) {
pColData->flag |= HAS_NULL;
SET_BIT2(pColData->pBitMap, pColData->nVal, 1);
if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL;
} else {
pColData->flag |= HAS_VALUE;
SET_BIT2(pColData->pBitMap, pColData->nVal, 2);
pValue = &pColVal->value;
}
if (pValue) {
code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, &pColVal->value, pColVal->type));
if (IS_VAR_DATA_TYPE(pColData->type)) {
// offset
code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * (pColData->nVal + 1));
if (code) goto _exit;
pColData->aOffset[pColData->nVal] = pColData->nData;
pColData->nData += tPutValue(pColData->pData + pColData->nData, &pColVal->value, pColVal->type);
// value
if ((!pColVal->isNone) && (!pColVal->isNull)) {
code = tsdbRealloc(&pColData->pData, pColData->nData + pColVal->value.nData);
if (code) goto _exit;
memcpy(pColData->pData + pColData->nData, pColVal->value.pData, pColVal->value.nData);
pColData->nData += pColVal->value.nData;
}
} else {
code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, pValue, pColVal->type));
if (code) goto _exit;
pColData->nData += tPutValue(pColData->pData + pColData->nData, pValue, pColVal->type);
}
pColData->nVal++;
pColData->offsetValid = 0;
_exit:
return code;
@ -1004,56 +864,32 @@ _exit:
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
int32_t code = 0;
int32_t size;
pColDataDest->cid = pColDataDest->cid;
pColDataDest->type = pColDataDest->type;
pColDataDest->offsetValid = 0;
pColDataDest->cid = pColDataSrc->cid;
pColDataDest->type = pColDataSrc->type;
pColDataDest->smaOn = pColDataSrc->smaOn;
pColDataDest->nVal = pColDataSrc->nVal;
pColDataDest->flag = pColDataSrc->flag;
if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) {
code = tsdbRealloc(&pColDataDest->pBitMap, BIT2_SIZE(pColDataDest->nVal));
size = BIT2_SIZE(pColDataSrc->nVal);
code = tsdbRealloc(&pColDataDest->pBitMap, size);
if (code) goto _exit;
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size);
if (IS_VAR_DATA_TYPE(pColDataDest->type)) {
size = sizeof(int32_t) * pColDataSrc->nVal;
code = tsdbRealloc((uint8_t **)&pColDataDest->aOffset, size);
if (code) goto _exit;
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, BIT2_SIZE(pColDataSrc->nVal));
memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size);
}
pColDataDest->nData = pColDataSrc->nData;
code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData);
if (code) goto _exit;
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataSrc->nData);
_exit:
return code;
}
static int32_t tColDataUpdateOffset(SColData *pColData) {
int32_t code = 0;
SValue value;
ASSERT(pColData->nVal > 0);
ASSERT(pColData->flag);
ASSERT(IS_VAR_DATA_TYPE(pColData->type));
if ((pColData->flag & HAS_VALUE)) {
code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal);
if (code) goto _exit;
int32_t offset = 0;
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
if (pColData->flag != HAS_VALUE) {
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
if (v == 0 || v == 1) {
pColData->aOffset[iVal] = -1;
continue;
}
}
pColData->aOffset[iVal] = offset;
offset += tGetValue(pColData->pData + offset, &value, pColData->type);
}
ASSERT(offset == pColData->nData);
pColData->offsetValid = 1;
}
pColDataDest->nData = pColDataSrc->nData;
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData);
_exit:
return code;
@ -1085,11 +921,13 @@ int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
// get value
SValue value;
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (!pColData->offsetValid) {
code = tColDataUpdateOffset(pColData);
if (code) goto _exit;
if (iVal + 1 < pColData->nVal) {
value.nData = pColData->aOffset[iVal + 1] - pColData->aOffset[iVal];
} else {
value.nData = pColData->nData - pColData->aOffset[iVal];
}
tGetValue(pColData->pData + pColData->aOffset[iVal], &value, pColData->type);
value.pData = pColData->pData + pColData->aOffset[iVal];
} else {
tGetValue(pColData->pData + tDataTypes[pColData->type].bytes * iVal, &value, pColData->type);
}
@ -1210,7 +1048,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
if (code) goto _err;
// append a NONE
tColDataReset(pColData, pColVal->cid, pColVal->type);
tColDataReset(pColData, pColVal->cid, pColVal->type, 0);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
if (code) goto _err;
@ -1240,7 +1078,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _err;
tColDataReset(pColData, pColVal->cid, pColVal->type);
tColDataReset(pColData, pColVal->cid, pColVal->type, 0);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
if (code) goto _err;