diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5bbc0d45f1..a824f8f0b3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -182,8 +182,12 @@ int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, int32_t *szOut, uint8_t **ppBuf); +int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut, + uint8_t **ppBuf); int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut, uint8_t **ppBuf); +int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, + uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); @@ -254,6 +258,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); +#if 0 int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, @@ -262,6 +267,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl uint8_t **ppBuf2); int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); +#endif // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index faabf60f71..125e1bcb73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -677,6 +677,159 @@ _err: return code; } +int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) { + int32_t code = 0; + SSmaInfo *pSmaInfo = &pBlock->smaInfo; + + ASSERT(pSmaInfo->size > 0); + + taosArrayClear(aColumnDataAgg); + + // alloc + int32_t size = pSmaInfo->size + sizeof(TSCKSUM); + code = tRealloc(&pReader->pBuf1, size); + if (code) goto _err; + + // read + int64_t n = taosReadFile(pReader->pSmaFD, pReader->pBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(pReader->pBuf1, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = 0; + while (n < pSmaInfo->size) { + SColumnDataAgg sma; + + n += tGetColumnDataAgg(pReader->pBuf1 + n, &sma); + if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + return code; + +_err: + tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, int16_t *aColId, + int32_t nColId, SBlockData *pBlockData) { + int32_t code = 0; + + tBlockDataReset(pBlockData); + + TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; + + // seek + int64_t n = taosLSeekFile(pFD, pBlkInfo->offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + code = tRealloc(&pReader->pBuf1, pBlkInfo->szBlock); + if (code) goto _err; + + n = taosReadFile(pFD, pReader->pBuf1, pBlkInfo->szBlock); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < pBlkInfo->szBlock) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + uint8_t *p = pReader->pBuf1; + // check & decode + SDiskDataHdr hdr; + if (!taosCheckChecksumWhole(p, pBlkInfo->szKey)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + p += tGetDiskDataHdr(p, &hdr); + + tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid); + pBlockData->nRow = hdr.nRow; + + if (hdr.uid == 0) { + ASSERT(hdr.szUid); + code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, + sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); + if (code) goto _err; + } else { + ASSERT(hdr.szUid == 0); + } + p += hdr.szUid; + + code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, + sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); + if (code) goto _err; + p += hdr.szVer; + + code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, + sizeof(TSKEY) * hdr.nRow, &pReader->pBuf2); + if (code) goto _err; + p += hdr.szKey; + p += sizeof(TSCKSUM); + + // SBlockCol + if (hdr.szBlkCol > 0) { + if (!taosCheckChecksumWhole(p, hdr.szBlkCol + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + int32_t iColData = 0; + uint8_t *pt = p + hdr.szBlkCol + sizeof(TSCKSUM); + n = 0; + while (n < hdr.szBlkCol) { + SBlockCol blockCol; + + n += tGetBlockCol(p + n, &blockCol); + + ASSERT(blockCol.flag && blockCol.flag != HAS_NONE); + + SColData *pColData; + code = tBlockDataAddColData(pBlockData, iColData, &pColData); + if (code) goto _err; + iColData++; + + tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn); + + if (blockCol.flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type)); + if (code) goto _err; + } + } else { + code = tsdbDecmprColData(pt + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, pReader->pBuf2); + if (code) goto _err; + } + } + } + + return code; + +_err: + tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + +#if 0 static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; #if 0 @@ -1350,54 +1503,7 @@ _err: #endif return code; } - -int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) { - int32_t code = 0; - SSmaInfo *pSmaInfo = &pBlock->smaInfo; - - ASSERT(pSmaInfo->size > 0); - - taosArrayClear(aColumnDataAgg); - - // alloc - int32_t size = pSmaInfo->size + sizeof(TSCKSUM); - code = tRealloc(&pReader->pBuf1, size); - if (code) goto _err; - - // read - int64_t n = taosReadFile(pReader->pSmaFD, pReader->pBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->pBuf1, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // decode - n = 0; - while (n < pSmaInfo->size) { - SColumnDataAgg sma; - - n += tGetColumnDataAgg(pReader->pBuf1 + n, &sma); - if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - return code; - -_err: - tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} +#endif // SDataFWriter ==================================================== int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index d669c28960..dbb35532e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1751,6 +1751,36 @@ _exit: return code; } +int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut, + uint8_t **ppBuf) { + int32_t code = 0; + + code = tRealloc(ppOut, szOut); + if (code) goto _exit; + + if (cmprAlg == NO_COMPRESSION) { + ASSERT(szIn == szOut); + memcpy(*ppOut, pIn, szOut); + } else { + if (cmprAlg == TWO_STAGE_COMP) { + code = tRealloc(ppBuf, szOut + COMP_OVERFLOW_BYTES); + if (code) goto _exit; + } + + int32_t size = tDataTypes[type].decompFunc(pIn, szIn, szOut / tDataTypes[type].bytes, *ppOut, szOut, cmprAlg, + *ppBuf, szOut + COMP_OVERFLOW_BYTES); + if (size <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _exit; + } + + ASSERT(size == szOut); + } + +_exit: + return code; +} + int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut, uint8_t **ppBuf) { int32_t code = 0; @@ -1793,3 +1823,48 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol _exit: return code; } + +int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, + uint8_t **ppBuf) { + int32_t code = 0; + + int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); + if (!taosCheckChecksumWhole(pIn, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pColData->cid = pBlockCol->cid; + pColData->type = pBlockCol->type; + pColData->smaOn = pBlockCol->smaOn; + pColData->flag = pBlockCol->flag; + pColData->nVal = nVal; + pColData->nData = pBlockCol->szOrigin; + + uint8_t *p = pIn; + // bitmap + if (pBlockCol->szBitmap) { + code = tsdbDecmprData(p, pBlockCol->szBitmap, TSDB_DATA_TYPE_TINYINT, cmprAlg, &pColData->pBitMap, + BIT2_SIZE(pColData->nVal), ppBuf); + if (code) goto _exit; + } + p += pBlockCol->szBitmap; + + // offset + if (pBlockCol->szOffset) { + code = tsdbDecmprData(p, pBlockCol->szOffset, TSDB_DATA_TYPE_INT, cmprAlg, (uint8_t **)&pColData->aOffset, + sizeof(int32_t) * pColData->nVal, ppBuf); + if (code) goto _exit; + } + p += pBlockCol->szOffset; + + // value + if (pBlockCol->szValue) { + code = tsdbDecmprData(p, pBlockCol->szValue, pColData->type, cmprAlg, &pColData->pData, pColData->nData, ppBuf); + if (code) goto _exit; + } + p += pBlockCol->szValue; + +_exit: + return code; +} \ No newline at end of file