diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index eacaeb9524..f35cf0d13d 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT}) endif(${BUILD_WITH_TRAFT}) add_subdirectory(tdev) +add_subdirectory(lz4) diff --git a/contrib/test/lz4/CMakeLists.txt b/contrib/test/lz4/CMakeLists.txt new file mode 100644 index 0000000000..92ac2aa5b2 --- /dev/null +++ b/contrib/test/lz4/CMakeLists.txt @@ -0,0 +1,6 @@ +add_executable(lz4_test "") +target_sources(lz4_test + PRIVATE + "main.c" +) +target_link_libraries(lz4_test lz4_static) \ No newline at end of file diff --git a/contrib/test/lz4/main.c b/contrib/test/lz4/main.c new file mode 100644 index 0000000000..49a6d8da01 --- /dev/null +++ b/contrib/test/lz4/main.c @@ -0,0 +1,8 @@ +#include + +#include "lz4.h" + +int main(int argc, char const *argv[]) { + printf("%d\n", LZ4_compressBound(1024)); + return 0; +} diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d48e9df522..aea5b3ccc1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -140,6 +140,7 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2); int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); +int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); @@ -364,14 +365,17 @@ typedef struct { int8_t type; int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE int64_t offset; - int64_t size; + int64_t bsize; // bitmap size + int64_t csize; // compressed column value size + int64_t osize; // original column value size (only save for variant data type) } SBlockCol; typedef struct { int64_t nRow; int8_t cmprAlg; int64_t offset; - int64_t ksize; + int64_t vsize; // VERSION size + int64_t ksize; // TSKEY size int64_t bsize; SMapData mBlockCol; // SMapData } SSubBlock; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index b8bef3211b..b88a88de59 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -599,91 +599,199 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl return code; } +static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, + SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { + int32_t code = 0; + uint8_t *p; + int64_t size; + int64_t n; + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + SBlockCol *pBlockCol = &(SBlockCol){}; + + // realloc + code = tsdbRealloc(ppBuf1, pSubBlock->bsize); + if (code) goto _err; + + // seek + n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < pSubBlock->bsize) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + p = *ppBuf1; + SBlockDataHdr *pHdr = (SBlockDataHdr *)p; + ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); + ASSERT(pHdr->suid == pBlockIdx->suid); + ASSERT(pHdr->uid == pBlockIdx->uid); + p += sizeof(*pHdr); + + if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); + + for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { + tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); + + ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); + + if (pBlockCol->flag == HAS_NULL) continue; + + if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + } + + // recover + pBlockData->nRow = pSubBlock->nRow; + p = *ppBuf1 + sizeof(*pHdr); + + code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t)); + if (code) goto _err; + code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY)); + if (code) goto _err; + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow); + ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow); + + // VERSION + memcpy(pBlockData->aVersion, p, pSubBlock->vsize); + + // TSKEY + memcpy(pBlockData->aTSKEY, p + pSubBlock->vsize, pSubBlock->ksize); + } else { + size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + } + + // VERSION + n = tsDecompressBigint(p, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion, + sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + // TSKEY + n = tsDecompressTimestamp(p + pSubBlock->vsize, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY, + sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + } + p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + + for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { + SColData *pColData; + + tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); + ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); + + code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData); + if (code) goto _err; + + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + if (pBlockCol->flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + continue; + } + pColData->nVal = pSubBlock->nRow; + pColData->flag = pBlockCol->flag; + + // bitmap + if (pBlockCol->flag != HAS_VALUE) { + size = BIT2_SIZE(pSubBlock->nRow); + code = tsdbRealloc(&pColData->pBitMap, size); + if (code) goto _err; + + ASSERT(pBlockCol->bsize == size); + + memcpy(pColData->pBitMap, p, size); + } else { + ASSERT(pBlockCol->bsize == 0); + } + p = p + pBlockCol->bsize; + + // value + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + pColData->nData = pBlockCol->osize; + } else { + pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow; + } + code = tsdbRealloc(&pColData->pData, pColData->nData); + if (code) goto _err; + + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + memcpy(pColData->pData, p, pColData->nData); + } else { + size = pColData->nData + COMP_OVERFLOW_BYTES; + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + } + + n = tDataTypes[pBlockCol->type].decompFunc(p, pBlockCol->csize, pSubBlock->nRow, pColData->pData, pColData->nData, + pSubBlock->cmprAlg, *ppBuf2, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + ASSERT(n == pColData->nData); + } + p = p + pBlockCol->csize + sizeof(TSCKSUM); + } + + // TODO + return code; + +_err: + tsdbError("vgId:%d tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - SBlockCol *pBlockCol = &(SBlockCol){}; + int32_t code = 0; + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + int32_t iSubBlock; if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; - for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - uint8_t *p; - int64_t n; + // read the first sub-block + iSubBlock = 0; + code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + if (code) goto _err; - // realloc - code = tsdbRealloc(ppBuf1, pSubBlock->bsize); - if (code) goto _err; - - // seek - n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < pSubBlock->bsize) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - p = *ppBuf1; - SBlockDataHdr *pHdr = (SBlockDataHdr *)p; - ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); - ASSERT(pHdr->suid == pBlockIdx->suid); - ASSERT(pHdr->uid == pBlockIdx->uid); - p += sizeof(*pHdr); - - if (!taosCheckChecksumWhole(p, pSubBlock->ksize)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - p += pSubBlock->ksize; - - for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); - - ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - if (pBlockCol->flag == HAS_NULL) continue; - - if (!taosCheckChecksumWhole(p, pBlockCol->size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - p += pBlockCol->size; - } - - // recover - pBlockData->nRow = pSubBlock->nRow; - p = *ppBuf1 + sizeof(*pHdr); - - code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t)); - if (code) goto _err; - code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY)); - if (code) goto _err; - p += pSubBlock->ksize; - - for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); - - if (pBlockCol->flag == HAS_NONE) { - // All NULL value - } else { - // decompress - p += pBlockCol->size; - } - } + // read remain block data and do merg + iSubBlock++; + for (; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + ASSERT(0); } if (pBuf1) tsdbFree(pBuf1); @@ -1134,27 +1242,26 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ pSubBlock->bsize += n; // TSDBKEY - pSubBlock->ksize = 0; if (cmprAlg == NO_COMPRESSION) { - // TSKEY - size = sizeof(TSKEY) * pBlockData->nRow; - n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - pSubBlock->ksize += size; - cksm = taosCalcChecksum(0, (uint8_t *)pBlockData->aTSKEY, size); + cksm = 0; // version - size = sizeof(int64_t) * pBlockData->nRow; - n = taosWriteFile(pFileFD, pBlockData->aVersion, size); + pSubBlock->vsize = sizeof(int64_t) * pBlockData->nRow; + n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->vsize); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pSubBlock->ksize += size; - cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, size); + cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->vsize); + + // TSKEY + pSubBlock->ksize = sizeof(TSKEY) * pBlockData->nRow; + n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->ksize); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->ksize); // cksm size = sizeof(cksm); @@ -1163,11 +1270,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pSubBlock->ksize += size; } else { ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP); - size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); + size = (sizeof(int64_t) + sizeof(TSKEY)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); code = tsdbRealloc(ppBuf1, size); if (code) goto _err; @@ -1177,37 +1283,37 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (code) goto _err; } - // TSKEY - n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, - size, cmprAlg, *ppBuf2, size); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - pSubBlock->ksize += n; - // version - n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, - *ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, cmprAlg, *ppBuf2, size); + n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, + size, cmprAlg, *ppBuf2, size); if (n <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; } - pSubBlock->ksize += n; + pSubBlock->vsize = n; + + // TSKEY + n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, + *ppBuf1 + pSubBlock->vsize, size - pSubBlock->vsize, cmprAlg, *ppBuf2, size); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + pSubBlock->ksize = n; // cksm - pSubBlock->ksize += sizeof(TSCKSUM); - ASSERT(pSubBlock->ksize <= size); - taosCalcChecksumAppend(0, *ppBuf1, pSubBlock->ksize); + n = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + ASSERT(n <= size); + taosCalcChecksumAppend(0, *ppBuf1, n); // write - n = taosWriteFile(pFileFD, *ppBuf1, pSubBlock->ksize); + n = taosWriteFile(pFileFD, *ppBuf1, n); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } } - pSubBlock->bsize += pSubBlock->ksize; + pSubBlock->bsize += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); // other columns offset = 0; @@ -1226,19 +1332,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (pColData->flag != HAS_NULL) { cksm = 0; pBlockCol->offset = offset; - pBlockCol->size = 0; // bitmap - if (pColData->flag != HAS_VALUE) { - // optimize bitmap storage (todo) - n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow)); + if (pColData->flag == HAS_VALUE) { + pBlockCol->bsize = 0; + } else { + pBlockCol->bsize = BIT2_SIZE(pBlockData->nRow); + n = taosWriteFile(pFileFD, pColData->pBitMap, pBlockCol->bsize); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - cksm = taosCalcChecksum(cksm, pColData->pBitMap, n); - pBlockCol->size += n; } // data @@ -1249,7 +1354,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pBlockCol->size += n; + pBlockCol->csize = n; + pBlockCol->osize = n; // checksum cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData); @@ -1258,7 +1364,6 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - pBlockCol->size += n; } else { size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM); @@ -1277,6 +1382,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TSDB_CODE_COMPRESS_ERROR; goto _err; } + pBlockCol->csize = n; + pBlockCol->osize = pColData->nData; // cksm n += sizeof(TSCKSUM); @@ -1289,13 +1396,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = TAOS_SYSTEM_ERROR(errno); goto _err; } - - pBlockCol->size += n; } // state - offset += pBlockCol->size; - pSubBlock->bsize += pBlockCol->size; + offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + pSubBlock->bsize = pSubBlock->bsize + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); } code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c6feb2aabf..b1c20a4340 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -384,6 +384,7 @@ int32_t tPutBlock(uint8_t *p, void *ph) { n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].vsize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize); n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol); @@ -408,6 +409,7 @@ int32_t tGetBlock(uint8_t *p, void *ph) { n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].vsize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize); n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol); @@ -443,7 +445,13 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { if (pBlockCol->flag != HAS_NULL) { n += tPutI64v(p ? p + n : p, pBlockCol->offset); - n += tPutI64v(p ? p + n : p, pBlockCol->size); + if (pBlockCol->flag != HAS_VALUE) { + n += tPutI64v(p ? p + n : p, pBlockCol->bsize); + } + n += tPutI64v(p ? p + n : p, pBlockCol->csize); + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + n += tPutI64v(p ? p + n : p, pBlockCol->osize); + } } return n; @@ -461,7 +469,17 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { if (pBlockCol->flag != HAS_NULL) { n += tGetI64v(p + n, &pBlockCol->offset); - n += tGetI64v(p + n, &pBlockCol->size); + if (pBlockCol->flag != HAS_VALUE) { + n += tGetI64v(p + n, &pBlockCol->bsize); + } else { + pBlockCol->bsize = 0; + } + n += tGetI64v(p + n, &pBlockCol->csize); + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + n += tGetI64v(p + n, &pBlockCol->osize); + } else { + pBlockCol->osize = -1; + } } return n; @@ -1039,24 +1057,30 @@ void tBlockDataClear(SBlockData *pBlockData) { taosArrayDestroyEx(pBlockData->aColData, tColDataClear); } -static SColData *tBlockDataAddBlockCol(SBlockData *pBlockData, int32_t iColData, int16_t cid, int8_t type) { +int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) { + int32_t code = 0; SColData *pColData = NULL; int32_t idx = taosArrayGetSize(pBlockData->aColDataP); if (idx >= taosArrayGetSize(pBlockData->aColData)) { - if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) return NULL; + if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx); - tColDataReset(pColData, cid, type); - if (taosArrayInsert(pBlockData->aColDataP, iColData, &pColData) == NULL) return NULL; - - // append NONE - for (int32_t i = 0; i < pBlockData->nRow; i++) { - if (tColDataAppendValue(pColData, &COL_VAL_NONE(cid, type)) != 0) return NULL; + if (taosArrayInsert(pBlockData->aColDataP, iColData, &pColData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } - return pColData; + *ppColData = pColData; + return code; + +_err: + *ppColData = NULL; + return code; } int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { @@ -1092,10 +1116,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS code = tColDataAppendValue(pColData, &(COL_VAL_NONE(pColData->cid, pColData->type))); if (code) goto _err; } else { - pColData = tBlockDataAddBlockCol(pBlockData, iColData, pColVal->cid, pColVal->type); - if (pColData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _err; + + // append a NONE + tColDataReset(pColData, pColVal->cid, pColVal->type); + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type)); + if (code) goto _err; } code = tColDataAppendValue(pColData, pColVal); @@ -1119,10 +1147,13 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS } while (pColVal) { - pColData = tBlockDataAddBlockCol(pBlockData, iColData, pColVal->cid, pColVal->type); - if (pColData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _err; + + tColDataReset(pColData, pColVal->cid, pColVal->type); + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type)); + if (code) goto _err; } code = tColDataAppendValue(pColData, pColVal);