From 81897d32afca827cc98912c9aecd6ce89746bccf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 23 Jun 2022 10:30:19 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 20 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 21 +- source/dnode/vnode/src/tsdb/tsdbFS.c | 79 ++++++-- source/dnode/vnode/src/tsdb/tsdbFile.c | 2 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 189 +++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 28 +++ 6 files changed, 314 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5dfcb06f78..b1ff4c9d3f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct SDelFReader SDelFReader; typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; typedef struct SRowMerger SRowMerger; +typedef struct STsdbFSState STsdbFSState; #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_FHDR_SIZE 512 @@ -146,6 +147,8 @@ void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); +int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), + int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t tPutMapData(uint8_t *p, SMapData *pMapData); int32_t tGetMapData(uint8_t *p, SMapData *pMapData); // other @@ -166,8 +169,8 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); // tsdbFile.c ============================================================================================== -enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE }; -void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]); +typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT; +void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]); // SDelFile #define tsdbDelFileCreate() \ ((SDelFile){ \ @@ -179,6 +182,11 @@ int32_t tsdbFSClose(STsdbFS *pFS); int32_t tsdbFSBegin(STsdbFS *pFS); int32_t tsdbFSCommit(STsdbFS *pFS); int32_t tsdbFSRollback(STsdbFS *pFS); + +int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile); +int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet); +SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState); +SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); // tsdbReaderWriter.c ============================================================================================== // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); @@ -499,6 +507,14 @@ struct SRowMerger { SArray *pArray; // SArray }; +struct STsdbFS { + STsdb *pTsdb; + TdThreadRwlock lock; + int8_t inTxn; + STsdbFSState *cState; + STsdbFSState *nState; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index f029f8a820..6ed12e05f8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -649,8 +649,8 @@ _err: static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; // TODO - SDFileSet *pWSet = NULL; // TODO + SDFileSet *pRSet = NULL; + SDFileSet *pWSet = NULL; // memory pCommitter->nextKey = TSKEY_MAX; @@ -660,6 +660,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { tMapDataReset(&pCommitter->oBlockMap); tBlockReset(&pCommitter->oBlock); tBlockDataReset(&pCommitter->oBlockData); + pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; @@ -673,6 +674,22 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { tMapDataReset(&pCommitter->nBlockMap); tBlockReset(&pCommitter->nBlock); tBlockDataReset(&pCommitter->nBlockData); + if (pRSet) { + pWSet = &(SDFileSet){.diskId = pRSet->diskId, + .fid = pCommitter->commitFid, + .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, + .fData = pRSet->fData, + .fLast = {.commitID = pCommitter->commitID, .size = 0}, + .fSma = pRSet->fSma}; + } else { + SDiskID did = {.level = 0, .id = 0}; // TODO: alloc a new one + pWSet = &(SDFileSet){.diskId = did, + .fid = pCommitter->commitFid, + .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, + .fData = {.commitID = pCommitter->commitID, .size = 0}, + .fLast = {.commitID = pCommitter->commitID, .size = 0}, + .fSma = {.commitID = pCommitter->commitID, .size = 0}}; + } code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 5896cd4855..f2e01b6baa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -15,18 +15,10 @@ #include "tsdb.h" -typedef struct { +struct STsdbFSState { SDelFile *pDelFile; SArray *aDFileSet; // SArray SDelFile delFile; -} STsdbFSState; - -struct STsdbFS { - STsdb *pTsdb; - TdThreadRwlock lock; - int8_t inTxn; - STsdbFSState *cState; - STsdbFSState *nState; }; // ================================================================================================= @@ -373,8 +365,8 @@ static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSS // SDFileSet while (iFrom < nFrom && iTo < nTo) { - pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom); - pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo); + pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); + pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo); if (pDFileSetFrom->fid == pDFileSetTo->fid) { code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo); @@ -393,7 +385,7 @@ static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSS } while (iFrom < nFrom) { - pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom); + pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom); code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL); if (code) goto _err; @@ -613,6 +605,16 @@ _err: return code; } +static int32_t tDFileSetCmprFn(const void *p1, const void *p2) { + if (((SDFileSet *)p1)->fid < ((SDFileSet *)p2)->fid) { + return -1; + } else if (((SDFileSet *)p1)->fid > ((SDFileSet *)p2)->fid) { + return 1; + } + + return 0; +} + // EXPOSED APIS ==================================================================================== int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) { int32_t code = 0; @@ -654,12 +656,17 @@ int32_t tsdbFSBegin(STsdbFS *pFS) { ASSERT(!pFS->inTxn); - pFS->inTxn = 1; + // SDelFile + pFS->nState->pDelFile = NULL; + if (pFS->cState->pDelFile) { + pFS->nState->delFile = pFS->cState->delFile; + pFS->nState->pDelFile = &pFS->nState->delFile; + } - pFS->nState->pDelFile = pFS->cState->pDelFile; + // SArray taosArrayClear(pFS->nState->aDFileSet); - for (int32_t iDFileSet = 0; iDFileSet < taosArrayGetSize(pFS->cState->aDFileSet); iDFileSet++) { - SDFileSet *pDFileSet = (SDFileSet *)taosArrayGetP(pFS->cState->aDFileSet, iDFileSet); + for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->cState->aDFileSet); iSet++) { + SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->cState->aDFileSet, iSet); if (taosArrayPush(pFS->nState->aDFileSet, &pDFileSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -667,6 +674,7 @@ int32_t tsdbFSBegin(STsdbFS *pFS) { } } + pFS->inTxn = 1; return code; _err: @@ -713,3 +721,42 @@ _err: tsdbError("vgId:%d tsdb fs rollback failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); return code; } + +int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile) { + int32_t code = 0; + pState->delFile = *pDelFile; + pState->pDelFile = &pState->delFile; + return code; +} + +int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet) { + int32_t code = 0; + int32_t idx = taosArraySearchIdx(pState->aDFileSet, pSet, tDFileSetCmprFn, TD_GE); + + if (idx < 0) { + if (taosArrayPush(pState->aDFileSet, pSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } else { + SDFileSet *tDFileSet = (SDFileSet *)taosArrayGet(pState->aDFileSet, idx); + int32_t c = tDFileSetCmprFn(pSet, tDFileSet); + if (c == 0) { + taosArraySet(pState->aDFileSet, idx, pSet); + } else { + if (taosArrayInsert(pState->aDFileSet, idx, pSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + } + +_exit: + return code; +} + +SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; } + +SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) { + return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index fc827be048..a7800c7610 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -15,7 +15,7 @@ #include "tsdb.h" -void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]) { +void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]) { STfs *pTfs = pTsdb->pVnode->pTfs; switch (ftype) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 17ec4e4e65..d48000231c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -405,8 +405,58 @@ struct SDataFReader { }; int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { - int32_t code = 0; - // TODO + int32_t code = 0; + SDataFReader *pReader; + char fname[TSDB_FILENAME_LEN]; + + // alloc + pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTsdb = pTsdb; + pReader->pSet = pSet; + + // open impl + // head + tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname); + pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pHeadFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // data + tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname); + pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pDataFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // last + tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname); + pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pLastFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // sma + tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname); + pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ); + if (pReader->pSmaFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + *ppReader = pReader; + return code; + +_err: + tsdbError("vgId:%d tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppReader = NULL; return code; } @@ -566,8 +616,139 @@ struct SDataFWriter { }; int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { - int32_t code = 0; - // TODO + int32_t code = 0; + int32_t flag; + int64_t n; + SDataFWriter *pWriter = NULL; + char fname[TSDB_FILENAME_LEN]; + char hdr[TSDB_FHDR_SIZE] = {0}; + + // alloc + pWriter = taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTsdb = pTsdb; + pWriter->pSet = pSet; + + // create the directory if not there + + // head + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname); + pWriter->pHeadFD = taosOpenFile(fname, flag); + if (pWriter->pHeadFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == TSDB_FHDR_SIZE); + + pSet->fHead.size += TSDB_FHDR_SIZE; + + // data + if (pSet->fData.size == 0) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + } else { + flag = TD_FILE_WRITE; + } + tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname); + pWriter->pDataFD = taosOpenFile(fname, flag); + if (pWriter->pDataFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (pSet->fData.size == 0) { + n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pSet->fData.size += TSDB_FHDR_SIZE; + } else { + n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == pSet->fData.size); + } + + // last + if (pSet->fLast.size == 0) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + } else { + flag = TD_FILE_WRITE; + } + tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname); + pWriter->pLastFD = taosOpenFile(fname, flag); + if (pWriter->pLastFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (pSet->fLast.size == 0) { + n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pSet->fLast.size += TSDB_FHDR_SIZE; + } else { + n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_END); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == pSet->fLast.size); + } + + // sma + if (pSet->fSma.size == 0) { + flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; + } else { + flag = TD_FILE_WRITE; + } + tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname); + pWriter->pSmaFD = taosOpenFile(fname, flag); + if (pWriter->pSmaFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + if (pSet->fSma.size == 0) { + n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pSet->fSma.size += TSDB_FHDR_SIZE; + } else { + n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(n == pSet->fSma.size); + } + + *ppWriter = pWriter; + return code; + +_err: + tsdbError("vgId:%d tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + *ppWriter = NULL; return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 68b649869e..1b93188afd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -70,6 +70,34 @@ static int32_t tMapDataGetOffset(SMapData *pMapData, int32_t idx) { ASSERT(0); } } +int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), + int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { + int32_t code = 0; + int32_t lidx = 0; + int32_t ridx = pMapData->nItem - 1; + int32_t midx; + int32_t c; + + while (lidx <= ridx) { + midx = (lidx + ridx) / 2; + + tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn); + + c = tItemCmprFn(pSearchItem, pItem); + if (c == 0) { + goto _exit; + } else if (c < 0) { + ridx = midx - 1; + } else { + lidx = midx + 1; + } + } + + code = TSDB_CODE_NOT_FOUND; + +_exit: + return code; +} int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { int32_t code = 0;