diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a404a3c9bb..1818c3ec9f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -593,11 +593,13 @@ struct STsdbReadSnap { }; typedef struct { - TdFilePtr pFD; + char *path; int32_t szPage; + int32_t flag; + TdFilePtr pFD; + int64_t pgno; int32_t nBuf; uint8_t *pBuf; - int64_t pgno; } STsdbFD; struct SDataFWriter { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 80fef30a24..a6141311cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -18,30 +18,38 @@ #define TSDB_DEFAULT_PAGE_SIZE 4096 // =============== PAGE-WISE FILE =============== -static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t opt, STsdbFD **ppFD) { +#define PAGE_CONTENT_SIZE(SIZE) ((SIZE) - sizeof(TSCKSUM)) +#define PAGE_OFFSET(PGNO, SIZE) (((PGNO)-1) * (SIZE)) +#define OFFSET_PGNO(OFFSET, SIZE) ((OFFSET) / (SIZE) + 1) + +static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; STsdbFD *pFD; *ppFD = NULL; - pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD)); + pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1); if (pFD == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - pFD->pFD = taosOpenFile(path, opt); + pFD->path = (char *)&pFD[1]; + strcpy(pFD->path, path); + pFD->szPage = szPage; + pFD->flag = flag; + pFD->pFD = taosOpenFile(path, flag); if (pFD->pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); goto _exit; } - pFD->szPage = szPage; pFD->pgno = 0; pFD->nBuf = 0; - pFD->pBuf = taosMemoryMalloc(pFD->szPage); + pFD->pBuf = taosMemoryMalloc(szPage); if (pFD->pBuf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pFD); goto _exit; } *ppFD = pFD; @@ -102,12 +110,15 @@ _exit: static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { int32_t code = 0; - int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET); + // seek + int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); + int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _exit; } + // read n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -117,6 +128,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { goto _exit; } + // check if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { code = TSDB_CODE_FILE_CORRUPTED; goto _exit; @@ -128,27 +140,33 @@ _exit: return code; } -static int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { +static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { int32_t code = 0; + int64_t n; + int64_t pgno = OFFSET_PGNO(offset, pFD->szPage); + int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage); - int64_t pgno = offset / pFD->szPage; - int64_t n = 0; + ASSERT(pgno); if (pFD->pgno == pgno) { int64_t bOff = offset % pFD->szPage; - int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count); - memcpy(pBuf + n, pFD->pBuf + bOff, nRead); + int64_t nRead = TMIN(szPgCont - bOff, count); + + ASSERT(bOff < szPgCont); + + memcpy(pBuf, pFD->pBuf + bOff, nRead); n = nRead; + pgno++; } while (n < count) { code = tsdbReadFilePage(pFD, pgno); if (code) goto _exit; - pgno++; - - int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n); + int64_t nRead = TMIN(szPgCont, count - n); memcpy(pBuf + n, pFD->pBuf, nRead); + n += nRead; + pgno++; } _exit: @@ -747,7 +765,7 @@ _err: int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { int32_t code = 0; - if (*ppReader == NULL) goto _exit; + if (*ppReader == NULL) return code; // head tsdbCloseFile(&(*ppReader)->pHeadFD); @@ -767,8 +785,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { tFree((*ppReader)->aBuf[iBuf]); } taosMemoryFree(*ppReader); - -_exit: *ppReader = NULL; return code; @@ -778,49 +794,27 @@ _err: } int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { - int32_t code = 0; - int64_t offset = pReader->pSet->pHeadF->offset; - int64_t size = pReader->pSet->pHeadF->size - offset; - int64_t n; - uint32_t delimiter; + int32_t code = 0; + int64_t offset = pReader->pSet->pHeadF->offset; + int64_t size = pReader->pSet->pHeadF->size - offset; // todo taosArrayClear(aBlockIdx); - if (size == 0) { - goto _exit; - } + if (size == 0) return code; // alloc code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // // seek - // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { - // code = TAOS_SYSTEM_ERROR(errno); - // goto _err; - // } - // read - n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); + if (code) goto _err; // decode - n = 0; - n = tGetU32(pReader->aBuf[0] + n, &delimiter); + uint32_t delimiter; + int64_t n = tGetU32(pReader->aBuf[0], &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - while (n < size - sizeof(TSCKSUM)) { + while (n < size) { SBlockIdx blockIdx; n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx); @@ -829,10 +823,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { goto _err; } } + ASSERT(n == size); - ASSERT(n + sizeof(TSCKSUM) == size); - -_exit: return code; _err: @@ -841,65 +833,41 @@ _err: } int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { - int32_t code = 0; - int64_t offset = pReader->pSet->aSstF[iSst]->offset; - int64_t size = pReader->pSet->aSstF[iSst]->size - offset; - int64_t n; - uint32_t delimiter; + int32_t code = 0; + int64_t offset = pReader->pSet->aSstF[iSst]->offset; + int64_t size = pReader->pSet->aSstF[iSst]->size - offset; // todo taosArrayClear(aSstBlk); - if (size == 0) { - goto _exit; - } + if (size == 0) return code; // alloc code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // // seek - // if (taosLSeekFile(pReader->aSstFD[iSst], offset, SEEK_SET) < 0) { - // code = TAOS_SYSTEM_ERROR(errno); - // goto _err; - // } - // read - n = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size); + if (code) goto _err; // decode - n = 0; - n = tGetU32(pReader->aBuf[0] + n, &delimiter); + uint32_t delimiter; + int64_t n = tGetU32(pReader->aBuf[0], &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - while (n < size - sizeof(TSCKSUM)) { - SSstBlk blockl; - n += tGetSstBlk(pReader->aBuf[0] + n, &blockl); + while (n < size) { + SSstBlk sstBlk; + n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk); - if (taosArrayPush(aSstBlk, &blockl) == NULL) { + if (taosArrayPush(aSstBlk, &sstBlk) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } + ASSERT(n == size); - ASSERT(n + sizeof(TSCKSUM) == size); - -_exit: return code; _err: - tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } @@ -907,49 +875,26 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl int32_t code = 0; int64_t offset = pBlockIdx->offset; int64_t size = pBlockIdx->size; - int64_t n; - int64_t tn; // alloc code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // // seek - // if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { - // code = TAOS_SYSTEM_ERROR(errno); - // goto _err; - // } - // read - n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size); + if (code) goto _err; // decode - n = 0; - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); + int64_t n = tGetU32(pReader->aBuf[0], &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - tn = tGetMapData(pReader->aBuf[0] + n, mBlock); + int64_t tn = tGetMapData(pReader->aBuf[0] + n, mBlock); if (tn < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - n += tn; - ASSERT(n + sizeof(TSCKSUM) == size); + ASSERT(n + tn == size); return code; @@ -967,48 +912,26 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol taosArrayClear(aColumnDataAgg); // alloc - int32_t size = pSmaInfo->size + sizeof(TSCKSUM); + int32_t size = pSmaInfo->size; code = tRealloc(&pReader->aBuf[0], size); if (code) goto _err; - // // seek - // int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET); - // if (n < 0) { - // code = TAOS_SYSTEM_ERROR(errno); - // goto _err; - // } else if (n < pSmaInfo->offset) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // read - int64_t n = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size); + if (code) goto _err; // decode - n = 0; + int32_t n = 0; while (n < pSmaInfo->size) { SColumnDataAgg sma; - n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma); + if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } - + ASSERT(n == pSmaInfo->size); return code; _err: