From 5037b49593e9201fef70beaf7e5aa4b34d3baa1b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 21 Jan 2021 02:31:58 +0000 Subject: [PATCH] reload data from file --- src/inc/tsdb.h | 4 ++-- src/tsdb/inc/tsdbFS.h | 1 + src/tsdb/inc/tsdbint.h | 1 + src/tsdb/src/tsdbFS.c | 5 +++-- src/tsdb/src/tsdbMain.c | 3 +-- src/tsdb/src/tsdbSync.c | 40 +++++++++++++++++++++++++++++++++++----- 6 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 23d2fbc78c..8af2feb6c8 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -330,8 +330,8 @@ void tsdbIncCommitRef(int vgId); void tsdbDecCommitRef(int vgId); // For TSDB file sync -int tsdbSyncSend(STsdbRepo *pRepo, int socketFd); -int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd); +int tsdbSyncSend(void *pRepo, int socketFd); +int tsdbSyncRecv(void *pRepo, int socketFd); #ifdef __cplusplus } diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 057dad6bd4..45ff223a17 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -83,6 +83,7 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); void tsdbFSIterSeek(SFSIter *pIter, int fid); SDFileSet *tsdbFSIterNext(SFSIter *pIter); +int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index d0c575a876..6202e0d783 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -94,6 +94,7 @@ int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); +int tsdbRestoreInfo(STsdbRepo* pRepo); static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { ASSERT(pRepo != NULL); diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 6398fedfee..96f08b5271 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -26,7 +26,6 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); -static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); // ================== CURRENT file header info static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { @@ -690,7 +689,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { return 0; } -static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { +int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { char tbuf[128]; STsdbFS * pfs = REPO_FS(pRepo); SMFile mf; @@ -700,6 +699,8 @@ static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { int64_t maxBufSize = 0; SMFInfo minfo; + // TODO: clear meta at first + // No meta file, just return if (pfs->cstatus->pmf == NULL) return 0; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index f0780b207b..9ee5109206 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -28,7 +28,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); -static int tsdbRestoreInfo(STsdbRepo *pRepo); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -539,7 +538,7 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } -static int tsdbRestoreInfo(STsdbRepo *pRepo) { +int tsdbRestoreInfo(STsdbRepo *pRepo) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 97e0cf3ffd..7195a3b818 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -24,6 +24,7 @@ typedef struct { SRtn rtn; int32_t socketFd; void * pBuf; + bool mfChanged; SMFile * pmf; SMFile mf; SDFileSet df; @@ -46,9 +47,11 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet); static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet); static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch); +static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged); -int32_t tsdbSyncSend(STsdbRepo *pRepo, int32_t socketFd) { - SSyncH synch = {0}; +int32_t tsdbSyncSend(void *tsdb, int32_t socketFd) { + STsdbRepo *pRepo = (STsdbRepo *)tsdb; + SSyncH synch = {0}; tsdbInitSyncH(&synch, pRepo, socketFd); // Disable TSDB commit @@ -75,7 +78,8 @@ _err: return -1; } -int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) { +int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) { + STsdbRepo *pRepo = (STsdbRepo *)tsdb; SSyncH synch; tsdbInitSyncH(&synch, pRepo, socketFd); @@ -91,10 +95,12 @@ int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) { goto _err; } - // TODO: need to restart TSDB or reload TSDB here - tsdbEndFSTxn(pRepo); tsdbDestroySyncH(&synch); + + // Reload file change + tsdbReload(pRepo, synch.mfChanged); + return 0; _err: @@ -179,6 +185,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) { // Local has no meta file or has a different meta file, need to copy from remote + pSynch->mfChanged = true; + if (tsdbSendDecision(pSynch, true) < 0) { tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; @@ -643,5 +651,27 @@ static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) { pSynch->pdf = &(pSynch->df); tsdbDecodeDFileSetEx(SYNC_BUFFER(pSynch), pSynch->pdf); + return 0; +} + +static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) { + if (isMfChanged) { + tsdbCloseMeta(pRepo); + tsdbFreeMeta(pRepo->tsdbMeta); + pRepo->tsdbMeta = tsdbNewMeta(REPO_CFG(pRepo)); + tsdbOpenMeta(pRepo); + tsdbLoadMetaCache(pRepo, true); + } + + tsdbUnRefMemTable(pRepo, pRepo->mem); + tsdbUnRefMemTable(pRepo, pRepo->imem); + pRepo->mem = NULL; + pRepo->imem = NULL; + + if (tsdbRestoreInfo(pRepo) < 0) { + tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + return 0; } \ No newline at end of file