more work

This commit is contained in:
Hongze Cheng 2022-06-27 05:30:10 +00:00
parent 9dd060b439
commit 4b5b3bad11
6 changed files with 299 additions and 144 deletions

View File

@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT})
endif(${BUILD_WITH_TRAFT}) endif(${BUILD_WITH_TRAFT})
add_subdirectory(tdev) add_subdirectory(tdev)
add_subdirectory(lz4)

View File

@ -0,0 +1,6 @@
add_executable(lz4_test "")
target_sources(lz4_test
PRIVATE
"main.c"
)
target_link_libraries(lz4_test lz4_static)

8
contrib/test/lz4/main.c Normal file
View File

@ -0,0 +1,8 @@
#include <stdio.h>
#include "lz4.h"
int main(int argc, char const *argv[]) {
printf("%d\n", LZ4_compressBound(1024));
return 0;
}

View File

@ -140,6 +140,7 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2);
int32_t tBlockDataInit(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
@ -364,14 +365,17 @@ typedef struct {
int8_t type; int8_t type;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int64_t offset; int64_t offset;
int64_t size; int64_t bsize; // bitmap size
int64_t csize; // compressed column value size
int64_t osize; // original column value size (only save for variant data type)
} SBlockCol; } SBlockCol;
typedef struct { typedef struct {
int64_t nRow; int64_t nRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t offset; int64_t offset;
int64_t ksize; int64_t vsize; // VERSION size
int64_t ksize; // TSKEY size
int64_t bsize; int64_t bsize;
SMapData mBlockCol; // SMapData<SBlockCol> SMapData mBlockCol; // SMapData<SBlockCol>
} SSubBlock; } SSubBlock;

View File

@ -599,91 +599,199 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
return code; return code;
} }
static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
uint8_t *p;
int64_t size;
int64_t n;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SBlockCol *pBlockCol = &(SBlockCol){};
// realloc
code = tsdbRealloc(ppBuf1, pSubBlock->bsize);
if (code) goto _err;
// seek
n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSubBlock->bsize) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
p = *ppBuf1;
SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
ASSERT(pHdr->suid == pBlockIdx->suid);
ASSERT(pHdr->uid == pBlockIdx->uid);
p += sizeof(*pHdr);
if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) continue;
if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
}
// recover
pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(*pHdr);
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t));
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY));
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION
memcpy(pBlockData->aVersion, p, pSubBlock->vsize);
// TSKEY
memcpy(pBlockData->aTSKEY, p + pSubBlock->vsize, pSubBlock->ksize);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// VERSION
n = tsDecompressBigint(p, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// TSKEY
n = tsDecompressTimestamp(p + pSubBlock->vsize, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
}
p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
SColData *pColData;
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
continue;
}
pColData->nVal = pSubBlock->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
if (pBlockCol->flag != HAS_VALUE) {
size = BIT2_SIZE(pSubBlock->nRow);
code = tsdbRealloc(&pColData->pBitMap, size);
if (code) goto _err;
ASSERT(pBlockCol->bsize == size);
memcpy(pColData->pBitMap, p, size);
} else {
ASSERT(pBlockCol->bsize == 0);
}
p = p + pBlockCol->bsize;
// value
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
pColData->nData = pBlockCol->osize;
} else {
pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow;
}
code = tsdbRealloc(&pColData->pData, pColData->nData);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
memcpy(pColData->pData, p, pColData->nData);
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
n = tDataTypes[pBlockCol->type].decompFunc(p, pBlockCol->csize, pSubBlock->nRow, pColData->pData, pColData->nData,
pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == pColData->nData);
}
p = p + pBlockCol->csize + sizeof(TSCKSUM);
}
// TODO
return code;
_err:
tsdbError("vgId:%d tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2) { uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0; int32_t code = 0;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
uint8_t *pBuf1 = NULL; uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL; uint8_t *pBuf2 = NULL;
SBlockCol *pBlockCol = &(SBlockCol){}; int32_t iSubBlock;
if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2; if (!ppBuf2) ppBuf2 = &pBuf2;
for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { // read the first sub-block
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; iSubBlock = 0;
uint8_t *p; code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
int64_t n; if (code) goto _err;
// realloc // read remain block data and do merg
code = tsdbRealloc(ppBuf1, pSubBlock->bsize); iSubBlock++;
if (code) goto _err; for (; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
ASSERT(0);
// seek
n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSubBlock->bsize) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
p = *ppBuf1;
SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
ASSERT(pHdr->suid == pBlockIdx->suid);
ASSERT(pHdr->uid == pBlockIdx->uid);
p += sizeof(*pHdr);
if (!taosCheckChecksumWhole(p, pSubBlock->ksize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += pSubBlock->ksize;
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) continue;
if (!taosCheckChecksumWhole(p, pBlockCol->size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += pBlockCol->size;
}
// recover
pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(*pHdr);
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t));
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY));
if (code) goto _err;
p += pSubBlock->ksize;
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
if (pBlockCol->flag == HAS_NONE) {
// All NULL value
} else {
// decompress
p += pBlockCol->size;
}
}
} }
if (pBuf1) tsdbFree(pBuf1); if (pBuf1) tsdbFree(pBuf1);
@ -1134,27 +1242,26 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pSubBlock->bsize += n; pSubBlock->bsize += n;
// TSDBKEY // TSDBKEY
pSubBlock->ksize = 0;
if (cmprAlg == NO_COMPRESSION) { if (cmprAlg == NO_COMPRESSION) {
// TSKEY cksm = 0;
size = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->ksize += size;
cksm = taosCalcChecksum(0, (uint8_t *)pBlockData->aTSKEY, size);
// version // version
size = sizeof(int64_t) * pBlockData->nRow; pSubBlock->vsize = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, size); n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->vsize);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pSubBlock->ksize += size; cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->vsize);
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, size);
// TSKEY
pSubBlock->ksize = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->ksize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->ksize);
// cksm // cksm
size = sizeof(cksm); size = sizeof(cksm);
@ -1163,11 +1270,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pSubBlock->ksize += size;
} else { } else {
ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP); ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP);
size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); size = (sizeof(int64_t) + sizeof(TSKEY)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size); code = tsdbRealloc(ppBuf1, size);
if (code) goto _err; if (code) goto _err;
@ -1177,37 +1283,37 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (code) goto _err; if (code) goto _err;
} }
// TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1,
size, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->ksize += n;
// version // version
n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, *ppBuf1,
*ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, cmprAlg, *ppBuf2, size); size, cmprAlg, *ppBuf2, size);
if (n <= 0) { if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _err; goto _err;
} }
pSubBlock->ksize += n; pSubBlock->vsize = n;
// TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->vsize, size - pSubBlock->vsize, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->ksize = n;
// cksm // cksm
pSubBlock->ksize += sizeof(TSCKSUM); n = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
ASSERT(pSubBlock->ksize <= size); ASSERT(n <= size);
taosCalcChecksumAppend(0, *ppBuf1, pSubBlock->ksize); taosCalcChecksumAppend(0, *ppBuf1, n);
// write // write
n = taosWriteFile(pFileFD, *ppBuf1, pSubBlock->ksize); n = taosWriteFile(pFileFD, *ppBuf1, n);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
} }
pSubBlock->bsize += pSubBlock->ksize; pSubBlock->bsize += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
// other columns // other columns
offset = 0; offset = 0;
@ -1226,19 +1332,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (pColData->flag != HAS_NULL) { if (pColData->flag != HAS_NULL) {
cksm = 0; cksm = 0;
pBlockCol->offset = offset; pBlockCol->offset = offset;
pBlockCol->size = 0;
// bitmap // bitmap
if (pColData->flag != HAS_VALUE) { if (pColData->flag == HAS_VALUE) {
// optimize bitmap storage (todo) pBlockCol->bsize = 0;
n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow)); } else {
pBlockCol->bsize = BIT2_SIZE(pBlockData->nRow);
n = taosWriteFile(pFileFD, pColData->pBitMap, pBlockCol->bsize);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
cksm = taosCalcChecksum(cksm, pColData->pBitMap, n); cksm = taosCalcChecksum(cksm, pColData->pBitMap, n);
pBlockCol->size += n;
} }
// data // data
@ -1249,7 +1354,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pBlockCol->size += n; pBlockCol->csize = n;
pBlockCol->osize = n;
// checksum // checksum
cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData); cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData);
@ -1258,7 +1364,6 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pBlockCol->size += n;
} else { } else {
size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM); size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM);
@ -1277,6 +1382,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _err; goto _err;
} }
pBlockCol->csize = n;
pBlockCol->osize = pColData->nData;
// cksm // cksm
n += sizeof(TSCKSUM); n += sizeof(TSCKSUM);
@ -1289,13 +1396,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pBlockCol->size += n;
} }
// state // state
offset += pBlockCol->size; offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
pSubBlock->bsize += pBlockCol->size; pSubBlock->bsize = pSubBlock->bsize + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
} }
code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol); code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol);

View File

@ -384,6 +384,7 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].vsize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize);
n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol); n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol);
@ -408,6 +409,7 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].vsize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize);
n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol); n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol);
@ -443,7 +445,13 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tPutI64v(p ? p + n : p, pBlockCol->offset); n += tPutI64v(p ? p + n : p, pBlockCol->offset);
n += tPutI64v(p ? p + n : p, pBlockCol->size); if (pBlockCol->flag != HAS_VALUE) {
n += tPutI64v(p ? p + n : p, pBlockCol->bsize);
}
n += tPutI64v(p ? p + n : p, pBlockCol->csize);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tPutI64v(p ? p + n : p, pBlockCol->osize);
}
} }
return n; return n;
@ -461,7 +469,17 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tGetI64v(p + n, &pBlockCol->offset); n += tGetI64v(p + n, &pBlockCol->offset);
n += tGetI64v(p + n, &pBlockCol->size); if (pBlockCol->flag != HAS_VALUE) {
n += tGetI64v(p + n, &pBlockCol->bsize);
} else {
pBlockCol->bsize = 0;
}
n += tGetI64v(p + n, &pBlockCol->csize);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tGetI64v(p + n, &pBlockCol->osize);
} else {
pBlockCol->osize = -1;
}
} }
return n; return n;
@ -1039,24 +1057,30 @@ void tBlockDataClear(SBlockData *pBlockData) {
taosArrayDestroyEx(pBlockData->aColData, tColDataClear); taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
} }
static SColData *tBlockDataAddBlockCol(SBlockData *pBlockData, int32_t iColData, int16_t cid, int8_t type) { int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) {
int32_t code = 0;
SColData *pColData = NULL; SColData *pColData = NULL;
int32_t idx = taosArrayGetSize(pBlockData->aColDataP); int32_t idx = taosArrayGetSize(pBlockData->aColDataP);
if (idx >= taosArrayGetSize(pBlockData->aColData)) { if (idx >= taosArrayGetSize(pBlockData->aColData)) {
if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) return NULL; if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
} }
pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx); pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx);
tColDataReset(pColData, cid, type);
if (taosArrayInsert(pBlockData->aColDataP, iColData, &pColData) == NULL) return NULL; if (taosArrayInsert(pBlockData->aColDataP, iColData, &pColData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
// append NONE goto _err;
for (int32_t i = 0; i < pBlockData->nRow; i++) {
if (tColDataAppendValue(pColData, &COL_VAL_NONE(cid, type)) != 0) return NULL;
} }
return pColData; *ppColData = pColData;
return code;
_err:
*ppColData = NULL;
return code;
} }
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
@ -1092,10 +1116,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid, pColData->type))); code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid, pColData->type)));
if (code) goto _err; if (code) goto _err;
} else { } else {
pColData = tBlockDataAddBlockCol(pBlockData, iColData, pColVal->cid, pColVal->type); code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (pColData == NULL) { if (code) goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; // append a NONE
tColDataReset(pColData, pColVal->cid, pColVal->type);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
if (code) goto _err;
} }
code = tColDataAppendValue(pColData, pColVal); code = tColDataAppendValue(pColData, pColVal);
@ -1119,10 +1147,13 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
} }
while (pColVal) { while (pColVal) {
pColData = tBlockDataAddBlockCol(pBlockData, iColData, pColVal->cid, pColVal->type); code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (pColData == NULL) { if (code) goto _err;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; tColDataReset(pColData, pColVal->cid, pColVal->type);
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type));
if (code) goto _err;
} }
code = tColDataAppendValue(pColData, pColVal); code = tColDataAppendValue(pColData, pColVal);