diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 98a5041830..78eeff2e5a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,26 +32,27 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct TSDBROW TSDBROW; -typedef struct TSDBKEY TSDBKEY; -typedef struct TABLEID TABLEID; -typedef struct KEYINFO KEYINFO; -typedef struct SDelData SDelData; -typedef struct SDelIdx SDelIdx; -typedef struct STbData STbData; -typedef struct SMemTable SMemTable; -typedef struct STbDataIter STbDataIter; -typedef struct SMergeInfo SMergeInfo; -typedef struct STable STable; -typedef struct SMapData SMapData; -typedef struct SBlockSMA SBlockSMA; -typedef struct SBlockIdx SBlockIdx; -typedef struct SBlock SBlock; -typedef struct SBlockStatis SBlockStatis; -typedef struct SAggrBlkCol SAggrBlkCol; -typedef struct SColData SColData; -typedef struct SBlockData SBlockData; -typedef struct SReadH SReadH; +typedef struct TSDBROW TSDBROW; +typedef struct TSDBKEY TSDBKEY; +typedef struct TABLEID TABLEID; +typedef struct KEYINFO KEYINFO; +typedef struct SDelData SDelData; +typedef struct SDelIdx SDelIdx; +typedef struct STbData STbData; +typedef struct SMemTable SMemTable; +typedef struct STbDataIter STbDataIter; +typedef struct SMergeInfo SMergeInfo; +typedef struct STable STable; +typedef struct SMapData SMapData; +typedef struct SBlockSMA SBlockSMA; +typedef struct SBlockIdx SBlockIdx; +typedef struct SBlock SBlock; +typedef struct SBlockStatis SBlockStatis; +typedef struct SAggrBlkCol SAggrBlkCol; +typedef struct SColData SColData; +typedef struct SBlockDataHdr SBlockDataHdr; +typedef struct SBlockData SBlockData; +typedef struct SReadH SReadH; #define TSDB_MAX_SUBBLOCKS 8 @@ -105,8 +106,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx, - SBlock *pBlock); +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, + SBlockIdx *pBlockIdx, SBlock *pBlock); int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); // SDataFReader @@ -343,12 +344,11 @@ struct SBlock { int32_t nRow; int8_t last; int8_t hasDup; + int8_t cmprAlg; int8_t nSubBlock; SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; }; -int a = sizeof(SBlock); - struct SAggrBlkCol { int16_t colId; int16_t maxIndex; @@ -437,6 +437,12 @@ struct SBlockSMA { SColSMA *aColSMA; }; +struct SBlockDataHdr { + uint8_t delimiter; + int64_t suid; + int64_t uid; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 84c63a8192..cdba45c877 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -721,20 +721,110 @@ _err: return code; } -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx, - SBlock *pBlock) { - int32_t code = 0; - SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; - SBlockCol bCol; +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, + SBlockIdx *pBlockIdx, SBlock *pBlock) { + int32_t code = 0; + SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; + SBlockCol bCol; + int64_t size; + int64_t n; + TdFilePtr *pFileFD = pWriter->pDataFD; // TODO + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + TSCKSUM cksm; + uint8_t *p; + int64_t offset; + + pSubBlock->offset = 0; // TODO: set as file offset - pSubBlock->offset = 0; - pSubBlock->ksize = 0; pSubBlock->bsize = 0; - tMapDataClear(&pSubBlock->mBlockCol); + + // HDR + n = taosWriteFile(pFileFD, &hdr, sizeof(hdr)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pSubBlock->bsize += n; // TSDBKEY + pSubBlock->ksize = 0; + if (pBlock->cmprAlg == NO_COMPRESSION) { + // TSKEY + size = sizeof(TSKEY) * pBlockData->nRow; + n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pSubBlock->ksize += size; + cksm = taosCalcChecksum(0, (uint8_t *)pBlockData->aTSKEY, size); + + // version + size = sizeof(int64_t) * pBlockData->nRow; + n = taosWriteFile(pFileFD, pBlockData->aVersion, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pSubBlock->ksize += size; + cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, size); + + // cksm + size = sizeof(cksm); + n = taosWriteFile(pFileFD, (uint8_t *)&cksm, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pSubBlock->ksize += size; + } else { + ASSERT(pBlock->cmprAlg == ONE_STAGE_COMP || pBlock->cmprAlg == TWO_STAGE_COMP); + + size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); + + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + if (pBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + } + + // TSKEY + n = tsCompressTimestamp(pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, size, + pBlock->cmprAlg, *ppBuf2, size); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + pSubBlock->ksize += n; + + // version + n = tsCompressBigint(pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, + *ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, pBlock->cmprAlg, *ppBuf2, size); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + pSubBlock->ksize += n; + + // cksm + pSubBlock->ksize += sizeof(TSCKSUM); + ASSERT(pSubBlock->ksize <= size); + taosCalcChecksumAppend(0, *ppBuf1, pSubBlock->ksize); + + // write + n = taosWriteFile(pFileFD, *ppBuf1, pSubBlock->ksize); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + pSubBlock->bsize += pSubBlock->ksize; // other columns + offset = 0; + tMapDataClear(&pSubBlock->mBlockCol); for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) { SColData *pColData = &pBlockData->aColData[iCol]; @@ -747,14 +837,77 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ bCol.flag = pColData->flags; if (pColData->flags != HAS_NULL) { + cksm = 0; + bCol.offset = offset; + bCol.size = 0; + + // bitmap if (pColData->flags != HAS_VALUE) { - // handle bitmap + // TODO: optimize bitmap part + n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + cksm = taosCalcChecksum(cksm, pColData->pBitMap, n); + bCol.size += n; } - // handle real data + // data + if (pBlock->cmprAlg == NO_COMPRESSION) { + // data + n = taosWriteFile(pFileFD, pColData->pData, pColData->nData); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + bCol.size += n; - // bCol.offset = ; - // bCol.size = ; + // checksum + cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData); + n = taosWriteFile(pFileFD, &cksm, sizeof(cksm)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + bCol.size += n; + } else { + size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM); + + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + if (pBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + } + + // data + n = tDataTypes->compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, pBlock->cmprAlg, + *ppBuf2, size); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + // cksm + n += sizeof(TSCKSUM); + ASSERT(n <= size); + taosCalcChecksumAppend(cksm, *ppBuf1, n); + bCol.size += n; + + // write + n = taosWriteFile(pFileFD, *ppBuf1, bCol.size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + + // state + offset += bCol.size; + pSubBlock->bsize += bCol.size; } code = tMapDataPutItem(&pSubBlock->mBlockCol, &bCol, tPutBlockCol); @@ -764,7 +917,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ return code; _err: - tsdbError("vgId:%d write block data failed since %s", pWriter->pTsdb, tstrerror(code)); + tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 181208f73c..74f2db33aa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -443,8 +443,11 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK // TSDBROW ====================================================== TSDBKEY tsdbRowKey(TSDBROW *pRow) { - // TODO: support SBlockData version + // if (pRow->type == 0) { return (TSDBKEY){.version = pRow->version, .ts = pRow->pTSRow->ts}; + // } else { + // return (TSDBKEY){.version = pRow->pBlockData->aVersion[pRow->iRow], .ts = pRow->pBlockData->aTSKEY[pRow->iRow]}; + // } } void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {