diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index a6141311cf..cd2c6e7051 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -18,9 +18,12 @@ #define TSDB_DEFAULT_PAGE_SIZE 4096 // =============== PAGE-WISE FILE =============== -#define PAGE_CONTENT_SIZE(SIZE) ((SIZE) - sizeof(TSCKSUM)) -#define PAGE_OFFSET(PGNO, SIZE) (((PGNO)-1) * (SIZE)) -#define OFFSET_PGNO(OFFSET, SIZE) ((OFFSET) / (SIZE) + 1) +#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) +#define LOGIC_TO_FILE_OFFSET(OFFSET, PAGE) \ + ((OFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (OFFSET) % PAGE_CONTENT_SIZE(PAGE)) +#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE)) +#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE)) +#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1) static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; @@ -140,16 +143,17 @@ _exit: return code; } -static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { +static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t n; - int64_t pgno = OFFSET_PGNO(offset, pFD->szPage); + int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); + int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage); ASSERT(pgno); if (pFD->pgno == pgno) { - int64_t bOff = offset % pFD->szPage; - int64_t nRead = TMIN(szPgCont - bOff, count); + int64_t bOff = fOffset % pFD->szPage; + int64_t nRead = TMIN(szPgCont - bOff, size); ASSERT(bOff < szPgCont); @@ -158,11 +162,11 @@ static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t pgno++; } - while (n < count) { + while (n < size) { code = tsdbReadFilePage(pFD, pgno); if (code) goto _exit; - int64_t nRead = TMIN(szPgCont, count - n); + int64_t nRead = TMIN(szPgCont, size - n); memcpy(pBuf + n, pFD->pBuf, nRead); n += nRead; @@ -794,9 +798,10 @@ _err: } int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { - int32_t code = 0; - int64_t offset = pReader->pSet->pHeadF->offset; - int64_t size = pReader->pSet->pHeadF->size - offset; // todo + int32_t code = 0; + SHeadFile *pHeadFile = pReader->pSet->pHeadF; + int64_t offset = pHeadFile->offset; + int64_t size = pHeadFile->size - offset; taosArrayClear(aBlockIdx); if (size == 0) return code; @@ -810,10 +815,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { if (code) goto _err; // decode - uint32_t delimiter; - int64_t n = tGetU32(pReader->aBuf[0], &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - + int64_t n = 0; while (n < size) { SBlockIdx blockIdx; n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx); @@ -833,9 +835,10 @@ _err: } int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { - int32_t code = 0; - int64_t offset = pReader->pSet->aSstF[iSst]->offset; - int64_t size = pReader->pSet->aSstF[iSst]->size - offset; // todo + int32_t code = 0; + SSstFile *pSstFile = pReader->pSet->aSstF[iSst]; + int64_t offset = pSstFile->offset; + int64_t size = pSstFile->size - offset; taosArrayClear(aSstBlk); if (size == 0) return code; @@ -849,10 +852,7 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { if (code) goto _err; // decode - uint32_t delimiter; - int64_t n = tGetU32(pReader->aBuf[0], &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - + int64_t n = 0; while (n < size) { SSstBlk sstBlk; n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk); @@ -885,16 +885,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl if (code) goto _err; // decode - uint32_t delimiter; - int64_t n = tGetU32(pReader->aBuf[0], &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - - int64_t tn = tGetMapData(pReader->aBuf[0] + n, mBlock); - if (tn < 0) { + int64_t n = tGetMapData(pReader->aBuf[0], mBlock); + if (n < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - ASSERT(n + tn == size); + ASSERT(n == size); return code; @@ -912,12 +908,11 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol taosArrayClear(aColumnDataAgg); // alloc - int32_t size = pSmaInfo->size; - code = tRealloc(&pReader->aBuf[0], size); + code = tRealloc(&pReader->aBuf[0], pSmaInfo->size); if (code) goto _err; // read - code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size); + code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size); if (code) goto _err; // decode @@ -1117,7 +1112,8 @@ int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk int32_t code = 0; // read - tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock); + code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock); + if (code) goto _exit; // decmpr code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);