From fa257f50cefb36378e459a25ac5979824c1aea7f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 23 Aug 2022 15:47:42 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 4 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 38 ++++-- source/dnode/vnode/src/tsdb/tsdbFS.c | 40 ++++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 114 ++++++++---------- 4 files changed, 110 insertions(+), 86 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 1300c2ee8f..4c8208faa7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -589,13 +589,13 @@ struct SDataFWriter { TdFilePtr pHeadFD; TdFilePtr pDataFD; - TdFilePtr pLastFD; TdFilePtr pSmaFD; + TdFilePtr pLastFD; SHeadFile fHead; SDataFile fData; - SLastFile fLast; SSmaFile fSma; + SLastFile fLast[TSDB_MAX_LAST_FILE]; uint8_t *aBuf[4]; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 98ad8ea8fa..8542eca8c9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -437,31 +437,43 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // Writer SHeadFile fHead; SDataFile fData; - SLastFile fLast; SSmaFile fSma; - SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma}; + SLastFile fLast; + SDFileSet wSet = {0}; if (pRSet) { + ASSERT(pRSet->nLastF < pCommitter->maxLast); + fHead = (SHeadFile){.commitID = pCommitter->commitID}; + fData = *pRSet->pDataF; + fSma = *pRSet->pSmaF; + fLast = (SLastFile){.commitID = pCommitter->commitID}; + wSet.diskId = pRSet->diskId; wSet.fid = pCommitter->commitFid; - wSet.nLastF = 1; - fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - fData = *pRSet->pDataF; - fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - fSma = *pRSet->pSmaF; + wSet.pHeadF = &fHead; + wSet.pDataF = &fData; + wSet.pSmaF = &fSma; + for (int8_t iLast = 0; iLast < pRSet->nLastF; iLast++) { + wSet.aLastF[iLast] = pRSet->aLastF[iLast]; + } + wSet.nLastF = pRSet->nLastF + 1; + wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo } else { + fHead = (SHeadFile){.commitID = pCommitter->commitID}; + fData = (SDataFile){.commitID = pCommitter->commitID}; + fSma = (SSmaFile){.commitID = pCommitter->commitID}; + fLast = (SLastFile){.commitID = pCommitter->commitID}; + SDiskID did = {0}; - tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); - tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); wSet.diskId = did; wSet.fid = pCommitter->commitFid; + wSet.pHeadF = &fHead; + wSet.pDataF = &fData; + wSet.pSmaF = &fSma; wSet.nLastF = 1; - fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0}; - fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0}; + wSet.aLastF[0] = &fLast; } code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 000e262b92..e6a90825ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -629,13 +629,35 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { if (c == 0) { *pDFileSet->pHeadF = *pSet->pHeadF; *pDFileSet->pDataF = *pSet->pDataF; - *pDFileSet->aLastF[0] = *pSet->aLastF[0]; *pDFileSet->pSmaF = *pSet->pSmaF; + // last + if (pSet->nLastF > pDFileSet->nLastF) { + ASSERT(pSet->nLastF == pDFileSet->nLastF + 1); + + pDFileSet->aLastF[pDFileSet->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); + if (pDFileSet->aLastF[pDFileSet->nLastF] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + *pDFileSet->aLastF[pDFileSet->nLastF] = *pSet->aLastF[pSet->nLastF - 1]; + pDFileSet->nLastF++; + } else if (pSet->nLastF < pDFileSet->nLastF) { + ASSERT(pSet->nLastF == 1); + for (int32_t iLast = 1; iLast < pDFileSet->nLastF; iLast++) { + taosMemoryFree(pDFileSet->aLastF[iLast]); + } + + *pDFileSet->aLastF[0] = *pSet->aLastF[0]; + pDFileSet->nLastF = 1; + } else { + ASSERT(0); + } goto _exit; } } + ASSERT(pSet->nLastF == 1); SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1}; // head @@ -654,14 +676,6 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } *fSet.pDataF = *pSet->pDataF; - // last - fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (fSet.aLastF[0] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - *fSet.aLastF[0] = *pSet->aLastF[0]; - // sma fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); if (fSet.pSmaF == NULL) { @@ -670,6 +684,14 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } *fSet.pSmaF = *pSet->pSmaF; + // last + fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); + if (fSet.aLastF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + *fSet.aLastF[0] = *pSet->aLastF[0]; + if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 0aa32702dd..e07b0e8f94 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -936,18 +936,22 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - if (code) goto _err; pWriter->pTsdb = pTsdb; - pWriter->wSet = (SDFileSet){.diskId = pSet->diskId, - .fid = pSet->fid, - .pHeadF = &pWriter->fHead, - .pDataF = &pWriter->fData, - .aLastF[0] = &pWriter->fLast, - .pSmaF = &pWriter->fSma}; + pWriter->wSet = (SDFileSet){ + .diskId = pSet->diskId, + .fid = pSet->fid, + .pHeadF = &pWriter->fHead, + .pDataF = &pWriter->fData, + .pSmaF = &pWriter->fSma, + .nLastF = pSet->nLastF // + }; pWriter->fHead = *pSet->pHeadF; pWriter->fData = *pSet->pDataF; - pWriter->fLast = *pSet->aLastF[0]; pWriter->fSma = *pSet->pSmaF; + for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { + pWriter->wSet.aLastF[iLast] = &pWriter->fLast[iLast]; + pWriter->fLast[iLast] = *pSet->aLastF[iLast]; + } // head flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; @@ -998,36 +1002,6 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(n == pWriter->fData.size); } - // last - if (pWriter->fLast.size == 0) { - flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - } else { - flag = TD_FILE_WRITE; - } - tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast, fname); - pWriter->pLastFD = taosOpenFile(fname, flag); - if (pWriter->pLastFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - if (pWriter->fLast.size == 0) { - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pWriter->fLast.size += TSDB_FHDR_SIZE; - } else { - n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_END); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(n == pWriter->fLast.size); - } - // sma if (pWriter->fSma.size == 0) { flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; @@ -1058,6 +1032,22 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(n == pWriter->fSma.size); } + // last + ASSERT(pWriter->fLast[pSet->nLastF - 1].size == 0); + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast[pSet->nLastF - 1], fname); + pWriter->pLastFD = taosOpenFile(fname, flag); + if (pWriter->pLastFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pWriter->fLast[pWriter->wSet.nLastF - 1].size += TSDB_FHDR_SIZE; + *ppWriter = pWriter; return code; @@ -1085,12 +1075,12 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { goto _err; } - if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { + if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) { + if (taosFsyncFile((*ppWriter)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -1106,12 +1096,12 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { goto _err; } - if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) { + if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -1168,23 +1158,6 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { goto _err; } - // last ============== - memset(hdr, 0, TSDB_FHDR_SIZE); - tPutLastFile(hdr, &pWriter->fLast); - taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); - - n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - // sma ============== memset(hdr, 0, TSDB_FHDR_SIZE); tPutSmaFile(hdr, &pWriter->fSma); @@ -1202,6 +1175,23 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { goto _err; } + // last ============== + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutLastFile(hdr, &pWriter->fLast[pWriter->wSet.nLastF - 1]); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); + + n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + return code; _err: @@ -1309,7 +1299,7 @@ _err: int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t code = 0; - SLastFile *pLastFile = &pWriter->fLast; + SLastFile *pLastFile = &pWriter->fLast[pWriter->wSet.nLastF - 1]; int64_t size; int64_t n; @@ -1437,7 +1427,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ASSERT(pBlockData->nRow > 0); - pBlkInfo->offset = toLast ? pWriter->fLast.size : pWriter->fData.size; + pBlkInfo->offset = toLast ? pWriter->fLast[pWriter->wSet.nLastF - 1].size : pWriter->fData.size; pBlkInfo->szBlock = 0; pBlkInfo->szKey = 0; @@ -1481,7 +1471,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock // update info if (toLast) { - pWriter->fLast.size += pBlkInfo->szBlock; + pWriter->fLast[pWriter->wSet.nLastF - 1].size += pBlkInfo->szBlock; } else { pWriter->fData.size += pBlkInfo->szBlock; }