From 7fe743d00fc65eb0d5dabefed9528edbceb125c9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 2 Sep 2022 17:04:49 +0800 Subject: [PATCH] refact code for further change --- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 879 +++++++++--------- 1 file changed, 439 insertions(+), 440 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index d02d970943..3aa3aa5182 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -14,382 +14,136 @@ */ #include "tsdb.h" +// =============== PAGE-WISE FILE =============== +typedef struct { + TdFilePtr pFD; + int32_t szPage; + int32_t nBuf; + uint8_t *pBuf; + int64_t pgno; +} STsdbFD; -// SDelFWriter ==================================================== -int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { - int32_t code = 0; - char fname[TSDB_FILENAME_LEN]; - char hdr[TSDB_FHDR_SIZE] = {0}; - SDelFWriter *pDelFWriter; - int64_t n; - - // alloc - pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); - if (pDelFWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pDelFWriter->pTsdb = pTsdb; - pDelFWriter->fDel = *pFile; - - tsdbDelFileName(pTsdb, pFile, fname); - pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); - if (pDelFWriter->pWriteH == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // update header - n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pDelFWriter->fDel.size = TSDB_FHDR_SIZE; - pDelFWriter->fDel.offset = 0; - - *ppWriter = pDelFWriter; - return code; - -_err: - tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; -} - -int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) { - int32_t code = 0; - SDelFWriter *pWriter = *ppWriter; - STsdb *pTsdb = pWriter->pTsdb; - - // sync - if (sync && taosFsyncFile(pWriter->pWriteH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // close - if (taosCloseFile(&pWriter->pWriteH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) { - tFree(pWriter->aBuf[iBuf]); - } - taosMemoryFree(pWriter); - - *ppWriter = NULL; - return code; - -_err: - tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) { +int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { int32_t code = 0; - int64_t size; - int64_t n; - // prepare - size = sizeof(uint32_t); - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { - size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData)); - } - size += sizeof(TSCKSUM); - - // alloc - code = tRealloc(&pWriter->aBuf[0], size); - if (code) goto _err; - - // build - n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { - n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData)); - } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); - - // write - n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); - if (n < 0) { + pFD->pFD = taosOpenFile(path, opt); + if (pFD->pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + goto _exit; } - ASSERT(n == size); - - // update - pDelIdx->offset = pWriter->fDel.size; - pDelIdx->size = size; - pWriter->fDel.size += size; - - return code; - -_err: - tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) { - int32_t code = 0; - int64_t size; - int64_t n; - SDelIdx *pDelIdx; - - // prepare - size = sizeof(uint32_t); - for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { - size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx)); - } - size += sizeof(TSCKSUM); - - // alloc - code = tRealloc(&pWriter->aBuf[0], size); - if (code) goto _err; - - // build - n = 0; - n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); - for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { - n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx)); - } - taosCalcChecksumAppend(0, pWriter->aBuf[0], size); - - ASSERT(n + sizeof(TSCKSUM) == size); - - // write - n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // update - pWriter->fDel.offset = pWriter->fDel.size; - pWriter->fDel.size += size; - - return code; - -_err: - tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) { - int32_t code = 0; - char hdr[TSDB_FHDR_SIZE]; - int64_t size = TSDB_FHDR_SIZE; - int64_t n; - - // build - memset(hdr, 0, size); - tPutDelFile(hdr, &pWriter->fDel); - taosCalcChecksumAppend(0, hdr, size); - - // seek - if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // write - n = taosWriteFile(pWriter->pWriteH, hdr, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - return code; - -_err: - tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -// SDelFReader ==================================================== -struct SDelFReader { - STsdb *pTsdb; - SDelFile fDel; - TdFilePtr pReadH; - - uint8_t *aBuf[1]; -}; - -int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) { - int32_t code = 0; - char fname[TSDB_FILENAME_LEN]; - SDelFReader *pDelFReader; - int64_t n; - - // alloc - pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); - if (pDelFReader == NULL) { + pFD->szPage = 4096; + pFD->pgno = 0; + pFD->nBuf = 0; + pFD->pBuf = taosMemoryMalloc(pFD->szPage); + if (pFD->pBuf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + goto _exit; } - // open impl - pDelFReader->pTsdb = pTsdb; - pDelFReader->fDel = *pFile; - - tsdbDelFileName(pTsdb, pFile, fname); - pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ); - if (pDelFReader->pReadH == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pDelFReader); - goto _err; - } - -_exit: - *ppReader = pDelFReader; - return code; - -_err: - tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppReader = NULL; - return code; -} - -int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { - int32_t code = 0; - SDelFReader *pReader = *ppReader; - - if (pReader) { - if (taosCloseFile(&pReader->pReadH) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) { - tFree(pReader->aBuf[iBuf]); - } - taosMemoryFree(pReader); - } - *ppReader = NULL; - _exit: return code; } -int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) { +void tsdbCloseFile(STsdbFD *pFD) { + taosMemoryFree(pFD->pBuf); + taosCloseFile(&pFD->pFD); +} + +int32_t tsdbSyncFile(STsdbFD *pFD) { int32_t code = 0; - int64_t offset = pDelIdx->offset; - int64_t size = pDelIdx->size; - int64_t n; - taosArrayClear(aDelData); - - // seek - if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { + if (taosFsyncFile(pFD->pFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + goto _exit; } - // alloc - code = tRealloc(&pReader->aBuf[0], size); - if (code) goto _err; - - // read - n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // // decode - n = 0; - - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); - while (n < size - sizeof(TSCKSUM)) { - SDelData delData; - n += tGetDelData(pReader->aBuf[0] + n, &delData); - - if (taosArrayPush(aDelData, &delData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - ASSERT(n == size - sizeof(TSCKSUM)); - - return code; - -_err: - tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); +_exit: return code; } -int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) { +int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) { int32_t code = 0; - int32_t n; - int64_t offset = pReader->fDel.offset; - int64_t size = pReader->fDel.size - offset; - taosArrayClear(aDelIdx); + int32_t n = 0; + while (n < nBuf) { + int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM); + int32_t size = TMIN(remain, nBuf - n); - // seek - if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size); + n += size; + pFD->nBuf += size; - // alloc - code = tRealloc(&pReader->aBuf[0], size); - if (code) goto _err; + if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) { + taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage); - // read - n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } - // check - if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // decode - n = 0; - uint32_t delimiter; - n += tGetU32(pReader->aBuf[0] + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - - while (n < size - sizeof(TSCKSUM)) { - SDelIdx delIdx; - - n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx); - - if (taosArrayPush(aDelIdx, &delIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + pFD->nBuf = 0; } } - ASSERT(n == size - sizeof(TSCKSUM)); - +_exit: return code; +} -_err: - tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); +static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { + int32_t code = 0; + + int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < pFD->szPage) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + +_exit: + return code; +} + +int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { + int32_t code = 0; + + int64_t pgno = offset / pFD->szPage; + int64_t n = 0; + if (pFD->pgno == pgno) { + int64_t bOff = offset % pFD->szPage; + int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count); + memcpy(pBuf + n, pFD->pBuf + bOff, nRead); + n = nRead; + } + + while (n < count) { + code = tsdbReadFilePage(pFD, pgno); + if (code) goto _exit; + + pgno++; + + int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n); + memcpy(pBuf + n, pFD->pBuf, nRead); + n += nRead; + } + +_exit: return code; } @@ -1609,135 +1363,380 @@ _err: return code; } -// =============== PAGE-WISE FILE =============== -typedef struct { - TdFilePtr pFD; - int32_t szPage; - int32_t nBuf; - uint8_t *pBuf; - int64_t pgno; -} STsdbFD; +// SDelFWriter ==================================================== +int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN]; + char hdr[TSDB_FHDR_SIZE] = {0}; + SDelFWriter *pDelFWriter; + int64_t n; -int32_t tsdbOpenFile(const char *path, int32_t opt, STsdbFD *pFD) { - int32_t code = 0; - - pFD->pFD = taosOpenFile(path, opt); - if (pFD->pFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - - pFD->szPage = 4096; - pFD->pgno = 0; - pFD->nBuf = 0; - pFD->pBuf = taosMemoryMalloc(pFD->szPage); - if (pFD->pBuf == NULL) { + // alloc + pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); + if (pDelFWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + goto _err; } + pDelFWriter->pTsdb = pTsdb; + pDelFWriter->fDel = *pFile; -_exit: - return code; -} - -void tsdbCloseFile(STsdbFD *pFD) { - taosMemoryFree(pFD->pBuf); - taosCloseFile(&pFD->pFD); -} - -int32_t tsdbSyncFile(STsdbFD *pFD) { - int32_t code = 0; - - if (taosFsyncFile(pFD->pFD) < 0) { + tsdbDelFileName(pTsdb, pFile, fname); + pDelFWriter->pWriteH = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE); + if (pDelFWriter->pWriteH == NULL) { code = TAOS_SYSTEM_ERROR(errno); - goto _exit; + goto _err; } + // update header + n = taosWriteFile(pDelFWriter->pWriteH, &hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pDelFWriter->fDel.size = TSDB_FHDR_SIZE; + pDelFWriter->fDel.offset = 0; + + *ppWriter = pDelFWriter; + return code; + +_err: + tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) { + int32_t code = 0; + SDelFWriter *pWriter = *ppWriter; + STsdb *pTsdb = pWriter->pTsdb; + + // sync + if (sync && taosFsyncFile(pWriter->pWriteH) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // close + if (taosCloseFile(&pWriter->pWriteH) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) { + tFree(pWriter->aBuf[iBuf]); + } + taosMemoryFree(pWriter); + + *ppWriter = NULL; + return code; + +_err: + tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) { + int32_t code = 0; + int64_t size; + int64_t n; + + // prepare + size = sizeof(uint32_t); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { + size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData)); + } + size += sizeof(TSCKSUM); + + // alloc + code = tRealloc(&pWriter->aBuf[0], size); + if (code) goto _err; + + // build + n = 0; + n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { + n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData)); + } + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); + + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == size); + + // update + pDelIdx->offset = pWriter->fDel.size; + pDelIdx->size = size; + pWriter->fDel.size += size; + + return code; + +_err: + tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) { + int32_t code = 0; + int64_t size; + int64_t n; + SDelIdx *pDelIdx; + + // prepare + size = sizeof(uint32_t); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx)); + } + size += sizeof(TSCKSUM); + + // alloc + code = tRealloc(&pWriter->aBuf[0], size); + if (code) goto _err; + + // build + n = 0; + n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx)); + } + taosCalcChecksumAppend(0, pWriter->aBuf[0], size); + + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pWriteH, pWriter->aBuf[0], size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // update + pWriter->fDel.offset = pWriter->fDel.size; + pWriter->fDel.size += size; + + return code; + +_err: + tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) { + int32_t code = 0; + char hdr[TSDB_FHDR_SIZE]; + int64_t size = TSDB_FHDR_SIZE; + int64_t n; + + // build + memset(hdr, 0, size); + tPutDelFile(hdr, &pWriter->fDel); + taosCalcChecksumAppend(0, hdr, size); + + // seek + if (taosLSeekFile(pWriter->pWriteH, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // write + n = taosWriteFile(pWriter->pWriteH, hdr, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +// SDelFReader ==================================================== +struct SDelFReader { + STsdb *pTsdb; + SDelFile fDel; + TdFilePtr pReadH; + + uint8_t *aBuf[1]; +}; + +int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN]; + SDelFReader *pDelFReader; + int64_t n; + + // alloc + pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); + if (pDelFReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + // open impl + pDelFReader->pTsdb = pTsdb; + pDelFReader->fDel = *pFile; + + tsdbDelFileName(pTsdb, pFile, fname); + pDelFReader->pReadH = taosOpenFile(fname, TD_FILE_READ); + if (pDelFReader->pReadH == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pDelFReader); + goto _err; + } + +_exit: + *ppReader = pDelFReader; + return code; + +_err: + tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tsdbDelFReaderClose(SDelFReader **ppReader) { + int32_t code = 0; + SDelFReader *pReader = *ppReader; + + if (pReader) { + if (taosCloseFile(&pReader->pReadH) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) { + tFree(pReader->aBuf[iBuf]); + } + taosMemoryFree(pReader); + } + *ppReader = NULL; + _exit: return code; } -int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) { +int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) { int32_t code = 0; + int64_t offset = pDelIdx->offset; + int64_t size = pDelIdx->size; + int64_t n; - int32_t n = 0; - while (n < nBuf) { - int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM); - int32_t size = TMIN(remain, nBuf - n); + taosArrayClear(aDelData); - memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size); - n += size; - pFD->nBuf += size; + // seek + if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) { - taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage); + // alloc + code = tRealloc(&pReader->aBuf[0], size); + if (code) goto _err; - int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } + // read + n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } - pFD->nBuf = 0; + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // // decode + n = 0; + + uint32_t delimiter; + n += tGetU32(pReader->aBuf[0] + n, &delimiter); + while (n < size - sizeof(TSCKSUM)) { + SDelData delData; + n += tGetDelData(pReader->aBuf[0] + n, &delData); + + if (taosArrayPush(aDelData, &delData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } } -_exit: + ASSERT(n == size - sizeof(TSCKSUM)); + + return code; + +_err: + tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { +int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) { int32_t code = 0; + int32_t n; + int64_t offset = pReader->fDel.offset; + int64_t size = pReader->fDel.size - offset; - int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET); + taosArrayClear(aDelIdx); + + // seek + if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // alloc + code = tRealloc(&pReader->aBuf[0], size); + if (code) goto _err; + + // read + n = taosReadFile(pReader->pReadH, pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - - n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } else if (n < pFD->szPage) { + goto _err; + } else if (n < size) { code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; + goto _err; } - if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + // check + if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; + goto _err; } - pFD->pgno = pgno; + // decode + n = 0; + uint32_t delimiter; + n += tGetU32(pReader->aBuf[0] + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); -_exit: + while (n < size - sizeof(TSCKSUM)) { + SDelIdx delIdx; + + n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx); + + if (taosArrayPush(aDelIdx, &delIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + ASSERT(n == size - sizeof(TSCKSUM)); + + return code; + +_err: + tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } - -int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) { - int32_t code = 0; - - int64_t pgno = offset / pFD->szPage; - int64_t n = 0; - if (pFD->pgno == pgno) { - int64_t bOff = offset % pFD->szPage; - int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count); - memcpy(pBuf + n, pFD->pBuf + bOff, nRead); - n = nRead; - } - - while (n < count) { - code = tsdbReadFilePage(pFD, pgno); - if (code) goto _exit; - - pgno++; - - int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n); - memcpy(pBuf + n, pFD->pBuf, nRead); - n += nRead; - } - -_exit: - return code; -} \ No newline at end of file