diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 1818c3ec9f..da74dc9828 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -598,8 +598,8 @@ typedef struct { int32_t flag; TdFilePtr pFD; int64_t pgno; - int32_t nBuf; uint8_t *pBuf; + int64_t szFile; } STsdbFD; struct SDataFWriter { @@ -609,7 +609,7 @@ struct SDataFWriter { STsdbFD *pHeadFD; STsdbFD *pDataFD; STsdbFD *pSmaFD; - STsdbFD *pLastFD; + STsdbFD *pSstFD; SHeadFile fHead; SDataFile fData; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 1433d7c0be..0f50714d3e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -48,13 +48,18 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd } pFD->szPage = szPage; pFD->pgno = 0; - pFD->nBuf = 0; pFD->pBuf = taosMemoryMalloc(szPage); if (pFD->pBuf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pFD); goto _exit; } + if (taosStatFile(path, &pFD->szFile, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + ASSERT(pFD->szFile % szPage == 0); + pFD->szFile = pFD->szFile / szPage; *ppFD = pFD; _exit: @@ -69,42 +74,29 @@ static void tsdbCloseFile(STsdbFD **ppFD) { *ppFD = NULL; } -static int32_t tsdbFsyncFile(STsdbFD *pFD) { +static int32_t tsdbWriteFilePage(STsdbFD *pFD) { int32_t code = 0; - if (taosFsyncFile(pFD->pFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } + if (pFD->pgno > 0) { + int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } -_exit: - return code; -} + taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage); -static int32_t tsdbWriteFile(STsdbFD *pFD, uint8_t *pBuf, int32_t nBuf, int64_t *offset) { - int32_t code = 0; + n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } - int32_t n = 0; - while (n < nBuf) { - int32_t remain = pFD->szPage - pFD->nBuf - sizeof(TSCKSUM); - int32_t size = TMIN(remain, nBuf - n); - - memcpy(pFD->pBuf + pFD->nBuf, pBuf + n, size); - n += size; - pFD->nBuf += size; - - if (pFD->nBuf + sizeof(TSCKSUM) == pFD->szPage) { - taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage); - - int64_t n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - - pFD->nBuf = 0; + if (pFD->szFile < pFD->pgno) { + pFD->szFile = pFD->szFile; } } + pFD->pgno = 0; _exit: return code; @@ -113,6 +105,8 @@ _exit: static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { int32_t code = 0; + ASSERT(pgno <= pFD->szFile); + // seek int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); @@ -143,6 +137,38 @@ _exit: return code; } +static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { + int32_t code = 0; + int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); + int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); + int64_t bOffset = fOffset % pFD->szPage; + int64_t n = 0; + + do { + if (pFD->pgno != pgno) { + code = tsdbWriteFilePage(pFD); + if (code) goto _exit; + + if (pgno < pFD->szFile) { + code = tsdbReadFilePage(pFD, pgno); + if (code) goto _exit; + } else { + pFD->pgno = pgno; + } + } + + int64_t nRead = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n); + memcpy(pFD->pBuf + bOffset, pBuf + n, nRead); + + pgno++; + bOffset = 0; + n += nRead; + } while (n < size); + +_exit: + return code; +} + static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t n; @@ -177,9 +203,18 @@ _exit: return code; } -static int32_t tsdbLSeekFile(STsdbFD *pFD, int64_t offset) { +static int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; - ASSERT(0); + + code = tsdbWriteFilePage(pFD); + if (code) goto _exit; + + if (taosFsyncFile(pFD->pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + +_exit: return code; } @@ -220,11 +255,8 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD); if (code) goto _err; - code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; - - ASSERT(n == TSDB_FHDR_SIZE); - pWriter->fHead.size += TSDB_FHDR_SIZE; // data @@ -237,12 +269,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD); if (code) goto _err; if (pWriter->fData.size == 0) { - code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; pWriter->fData.size += TSDB_FHDR_SIZE; - } else { - // code = tsdbLSeekFile(pWriter->pDataFD, 0, SEEK_END); - // if (code) goto _err; } // sma @@ -255,22 +284,19 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD); if (code) goto _err; if (pWriter->fSma.size == 0) { - code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; pWriter->fSma.size += TSDB_FHDR_SIZE; - } else { - code = tsdbLSeekFile(pWriter->pSmaFD, 0); - if (code) goto _err; } // sst ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pLastFD); + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD); if (code) goto _err; - code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; @@ -291,31 +317,23 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { pTsdb = (*ppWriter)->pTsdb; if (sync) { - if (tsdbFsyncFile((*ppWriter)->pHeadFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbFsyncFile((*ppWriter)->pHeadFD); + if (code) goto _err; - if (tsdbFsyncFile((*ppWriter)->pDataFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbFsyncFile((*ppWriter)->pDataFD); + if (code) goto _err; - if (tsdbFsyncFile((*ppWriter)->pSmaFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbFsyncFile((*ppWriter)->pSmaFD); + if (code) goto _err; - if (tsdbFsyncFile((*ppWriter)->pLastFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbFsyncFile((*ppWriter)->pSstFD); + if (code) goto _err; } tsdbCloseFile(&(*ppWriter)->pHeadFD); tsdbCloseFile(&(*ppWriter)->pDataFD); tsdbCloseFile(&(*ppWriter)->pSmaFD); - tsdbCloseFile(&(*ppWriter)->pLastFD); + tsdbCloseFile(&(*ppWriter)->pSstFD); for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) { tFree((*ppWriter)->aBuf[iBuf]); @@ -338,41 +356,25 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { // head ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutHeadFile(hdr, &pWriter->fHead); - - code = tsdbLSeekFile(pWriter->pHeadFD, 0); - if (code) goto _err; - - code = tsdbWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; // data ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutDataFile(hdr, &pWriter->fData); - - code = tsdbLSeekFile(pWriter->pDataFD, 0); - if (code) goto _err; - - code = tsdbWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; // sma ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutSmaFile(hdr, &pWriter->fSma); - - code = tsdbLSeekFile(pWriter->pSmaFD, 0); - if (code) goto _err; - - code = tsdbWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; // sst ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); - - code = tsdbLSeekFile(pWriter->pLastFD, 0); - if (code) goto _err; - - code = tsdbWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE, NULL); + code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; return code; @@ -385,7 +387,7 @@ _err: int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { int32_t code = 0; SHeadFile *pHeadFile = &pWriter->fHead; - int64_t size = 0; + int64_t size; int64_t n; // check @@ -395,6 +397,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { } // prepare + size = 0; for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); } @@ -411,7 +414,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) { ASSERT(n == size); // write - code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL); + code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); if (code) goto _err; // update @@ -442,10 +445,10 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc if (code) goto _err; // build - n = tPutMapData(pWriter->aBuf[0] + n, mBlock); + n = tPutMapData(pWriter->aBuf[0], mBlock); // write - code = tsdbWriteFile(pWriter->pHeadFD, pWriter->aBuf[0], size, NULL); + code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); if (code) goto _err; // update @@ -467,7 +470,7 @@ _err: int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { int32_t code = 0; SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; - int64_t size = 0; + int64_t size; int64_t n; // check @@ -477,6 +480,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { } // size + size = 0; for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); } @@ -492,7 +496,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { } // write - code = tsdbWriteFile(pWriter->pLastFD, pWriter->aBuf[0], size, NULL); + code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size); if (code) goto _err; // update @@ -500,7 +504,7 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { pSstFile->size += size; _exit: - tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), + tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pSstFile->offset, size); return code; @@ -534,11 +538,11 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size); if (code) goto _err; - code = tsdbWriteFile(pWriter->pSmaFD, pWriter->aBuf[0], pSmaInfo->size, NULL); + code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size); if (code) goto _err; pSmaInfo->offset = pWriter->fSma.size; - // pWriter->fSma.size += size; + pWriter->fSma.size += pSmaInfo->size; } return code; @@ -554,7 +558,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ASSERT(pBlockData->nRow > 0); - pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size; + if (toLast) { + pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size; + } else { + pBlkInfo->offset = pWriter->fData.size; + } pBlkInfo->szBlock = 0; pBlkInfo->szKey = 0; @@ -563,24 +571,28 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock if (code) goto _err; // write ================= - STsdbFD *pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD; pBlkInfo->szKey = aBufN[3] + aBufN[2]; pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; - code = tsdbWriteFile(pFD, pWriter->aBuf[3], aBufN[3], NULL); + int64_t offset = pBlkInfo->offset; + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[3], aBufN[3]); if (code) goto _err; + offset += aBufN[3]; - code = tsdbWriteFile(pFD, pWriter->aBuf[2], aBufN[2], NULL); + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[2], aBufN[2]); if (code) goto _err; + offset += aBufN[2]; if (aBufN[1]) { - code = tsdbWriteFile(pFD, pWriter->aBuf[1], aBufN[1], NULL); + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[1], aBufN[1]); if (code) goto _err; + offset += aBufN[1]; } if (aBufN[0]) { - code = tsdbWriteFile(pFD, pWriter->aBuf[0], aBufN[0], NULL); + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], aBufN[0]); if (code) goto _err; }