diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 71f31494fc..9d7cfc0552 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -117,6 +117,7 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo); // SBlockCol int32_t tPutBlockCol(uint8_t *p, void *ph); int32_t tGetBlockCol(uint8_t *p, void *ph); +int32_t tBlockColCmprFn(const void *p1, const void *p2); // SBlock #define tBlockInit() ((SBlock){0}) void tBlockReset(SBlock *pBlock); @@ -229,7 +230,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, - SBlockData *pBlockData); + SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index a8a5cb076e..c75d7964a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -618,11 +618,228 @@ _err: return code; } +static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { + int32_t code = 0; + int64_t size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + int64_t n; + + if (!taosCheckChecksumWhole(pBuf, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow); + if (code) goto _err; + code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow); + if (code) goto _err; + + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow); + ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow); + + // VERSION + memcpy(pBlockData->aVersion, pBuf, pSubBlock->vsize); + + // TSKEY + pBuf = pBuf + pSubBlock->vsize; + memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->vsize, pSubBlock->ksize); + } else { + size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + } + + // VERSION + n = tsDecompressBigint(pBuf, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion, + sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + // TSKEY + pBuf = pBuf + pSubBlock->vsize; + n = tsDecompressTimestamp(pBuf, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY, + sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + } + + return code; + +_err: + return code; +} + +static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock, SBlockCol *pBlockCol, + SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) { + int32_t code = 0; + int64_t size; + int64_t n; + + ASSERT(pBlockCol->flag != HAS_NULL); + + if (!taosCheckChecksumWhole(pBuf, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + pColData->nVal = pSubBlock->nRow; + pColData->flag = pBlockCol->flag; + + // bitmap + if (pBlockCol->flag != HAS_VALUE) { + size = BIT2_SIZE(pSubBlock->nRow); + code = tsdbRealloc(&pColData->pBitMap, size); + if (code) goto _err; + + ASSERT(pBlockCol->bsize == size); + + memcpy(pColData->pBitMap, pBuf, size); + } else { + ASSERT(pBlockCol->bsize == 0); + } + pBuf = pBuf + pBlockCol->bsize; + + // value + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + pColData->nData = pBlockCol->osize; + } else { + pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow; + } + code = tsdbRealloc(&pColData->pData, pColData->nData); + if (code) goto _err; + + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + memcpy(pColData->pData, pBuf, pColData->nData); + } else { + size = pColData->nData + COMP_OVERFLOW_BYTES; + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + } + + n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->csize, pSubBlock->nRow, pColData->pData, + pColData->nData, pSubBlock->cmprAlg, *ppBuf, size); + if (n < 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + ASSERT(n == pColData->nData); + } + + return code; + +_err: + return code; +} + int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, - SBlockData *pBlockData) { + SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { int32_t code = 0; TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - // TODO + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + + ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); + + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + + for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + int64_t offset; + int64_t size; + int64_t n; + + tBlockDataReset(pBlockData); + pBlockData->nRow = pSubBlock->nRow; + + // TSDBKEY + offset = pSubBlock->offset + sizeof(SBlockDataHdr); + size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + if (code) goto _err; + + // OTHER + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + SColData *pColData; + for (int32_t iCol = 0; iCol < nCol; iCol++) { + int16_t cid = aColId[iCol]; + + if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == + 0) { + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); + if (code) goto _err; + + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + if (pBlockCol->flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + } else { + offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; + size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + // seek + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); + if (code) goto _err; + } + } + } + } + + tsdbFree(pBuf1); + tsdbFree(pBuf2); + return code; + +_err: + tsdbError("vgId:%d tsdb read col data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf1); + tsdbFree(pBuf2); return code; } @@ -691,42 +908,8 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, pBlockData->nRow = pSubBlock->nRow; p = *ppBuf1 + sizeof(*pHdr); - code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t)); + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); if (code) goto _err; - code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY)); - if (code) goto _err; - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow); - ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow); - - // VERSION - memcpy(pBlockData->aVersion, p, pSubBlock->vsize); - - // TSKEY - memcpy(pBlockData->aTSKEY, p + pSubBlock->vsize, pSubBlock->ksize); - } else { - size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tsdbRealloc(ppBuf2, size); - if (code) goto _err; - } - - // VERSION - n = tsDecompressBigint(p, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion, - sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - // TSKEY - n = tsDecompressTimestamp(p + pSubBlock->vsize, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY, - sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - } p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { @@ -744,56 +927,14 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); if (code) goto _err; } - continue; - } - pColData->nVal = pSubBlock->nRow; - pColData->flag = pBlockCol->flag; - - // bitmap - if (pBlockCol->flag != HAS_VALUE) { - size = BIT2_SIZE(pSubBlock->nRow); - code = tsdbRealloc(&pColData->pBitMap, size); + } else { + code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, p, ppBuf2); if (code) goto _err; - ASSERT(pBlockCol->bsize == size); - - memcpy(pColData->pBitMap, p, size); - } else { - ASSERT(pBlockCol->bsize == 0); + p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); } - p = p + pBlockCol->bsize; - - // value - if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - pColData->nData = pBlockCol->osize; - } else { - pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow; - } - code = tsdbRealloc(&pColData->pData, pColData->nData); - if (code) goto _err; - - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - memcpy(pColData->pData, p, pColData->nData); - } else { - size = pColData->nData + COMP_OVERFLOW_BYTES; - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tsdbRealloc(ppBuf2, size); - if (code) goto _err; - } - - n = tDataTypes[pBlockCol->type].decompFunc(p, pBlockCol->csize, pSubBlock->nRow, pColData->pData, pColData->nData, - pSubBlock->cmprAlg, *ppBuf2, size); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - ASSERT(n == pColData->nData); - } - p = p + pBlockCol->csize + sizeof(TSCKSUM); } - // TODO return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 58397b1a9b..489d6d749a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -545,6 +545,16 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { return n; } +int32_t tBlockColCmprFn(const void *p1, const void *p2) { + if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) { + return -1; + } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) { + return 1; + } + + return 0; +} + // SDelIdx ====================================================== int32_t tCmprDelIdx(void const *lhs, void const *rhs) { SDelIdx *lDelIdx = *(SDelIdx **)lhs;