From f6c676a174075560f2881a3e8665f1d27fcfc1ba Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 28 May 2022 11:11:48 +0800 Subject: [PATCH] enh: sdb snapshot --- source/dnode/mnode/impl/inc/mndInt.h | 61 ++++++----- .../mnode/impl/src/{mnode.c => mndMain.c} | 2 +- source/dnode/mnode/impl/src/mndSync.c | 8 +- source/dnode/mnode/sdb/inc/sdb.h | 2 +- source/dnode/mnode/sdb/src/sdb.c | 6 +- source/dnode/mnode/sdb/src/sdbFile.c | 103 ++++++++++++++---- 6 files changed, 120 insertions(+), 62 deletions(-) rename source/dnode/mnode/impl/src/{mnode.c => mndMain.c} (99%) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index cec49a4cbe..415a42ee79 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,13 +75,12 @@ typedef struct { } STelemMgmt; typedef struct { - SWal *pWal; - sem_t syncSem; - int64_t sync; - bool standby; - bool restored; - int32_t errCode; - int32_t transId; + SWal *pWal; + sem_t syncSem; + int64_t sync; + bool standby; + int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { @@ -90,29 +89,31 @@ typedef struct { } SGrantInfo; typedef struct SMnode { - int32_t selfDnodeId; - int64_t clusterId; - TdThread thread; - bool deploy; - bool stopped; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - char *path; - int64_t checkTime; - SSdb *pSdb; - SMgmtWrapper *pWrapper; - SArray *pSteps; - SQHandle *pQuery; - SShowMgmt showMgmt; - SProfileMgmt profileMgmt; - STelemMgmt telemMgmt; - SSyncMgmt syncMgmt; - SHashObj *infosMeta; - SHashObj *perfsMeta; - SGrantInfo grant; - MndMsgFp msgFp[TDMT_MAX]; - SMsgCb msgCb; + int32_t selfDnodeId; + int64_t clusterId; + TdThread thread; + TdThreadRwlock lock; + int32_t refCount; + bool stopped; + bool restored; + bool deploy; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + char *path; + int64_t checkTime; + SSdb *pSdb; + SArray *pSteps; + SQHandle *pQuery; + SHashObj *infosMeta; + SHashObj *perfsMeta; + SShowMgmt showMgmt; + SProfileMgmt profileMgmt; + STelemMgmt telemMgmt; + SSyncMgmt syncMgmt; + SGrantInfo grant; + MndMsgFp msgFp[TDMT_MAX]; + SMsgCb msgCb; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mndMain.c similarity index 99% rename from source/dnode/mnode/impl/src/mnode.c rename to source/dnode/mnode/impl/src/mndMain.c index 5458bc5126..25b5e6967f 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -336,7 +336,7 @@ int32_t mndStart(SMnode *pMnode) { mndSyncStart(pMnode); if (pMnode->deploy) { if (sdbDeploy(pMnode->pSdb) != 0) return -1; - pMnode->syncMgmt.restored = true; + pMnode->restored = true; } return mndInitTimer(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e25aa468bf..b3204e7bac 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { if (!pMnode->deploy) { mInfo("mnode sync restore finished"); mndTransPullup(pMnode); - pMnode->syncMgmt.restored = true; + pMnode->restored = true; } } @@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; - pMnode->syncMgmt.restored = false; + pMnode->restored = false; mInfo("start to apply snapshot to sdb, len:%d", len); int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); @@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char mError("failed to apply snapshot to sdb, len:%d", len); } else { mInfo("successfully to apply snapshot to sdb, len:%d", len); - pMnode->syncMgmt.restored = true; + pMnode->restored = true; } // taosMemoryFree(pBuf); @@ -250,7 +250,7 @@ bool mndIsMaster(SMnode *pMnode) { return false; } - if (!pMgmt->restored) { + if (!pMnode->restored) { terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 32e29f5e1c..411d4c59ea 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -166,7 +166,6 @@ typedef struct SSdbRow { typedef struct SSdb { SMnode *pMnode; char *currDir; - char *syncDir; char *tmpDir; int64_t lastCommitVer; int64_t curVer; @@ -182,6 +181,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + TdThreadMutex filelock; } SSdb; typedef struct SSdbIter { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index d289e30d7b..aef3476440 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -56,6 +56,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { pSdb->curTerm = -1; pSdb->lastCommitVer = -1; pSdb->pMnode = pOption->pMnode; + taosThreadMutexInit(&pSdb->filelock, NULL); mDebug("sdb init successfully"); return pSdb; } @@ -69,10 +70,6 @@ void sdbCleanup(SSdb *pSdb) { taosMemoryFreeClear(pSdb->currDir); } - if (pSdb->syncDir != NULL) { - taosMemoryFreeClear(pSdb->syncDir); - } - if (pSdb->tmpDir != NULL) { taosMemoryFreeClear(pSdb->tmpDir); } @@ -104,6 +101,7 @@ void sdbCleanup(SSdb *pSdb) { mDebug("sdb table:%s is cleaned up", sdbTableName(i)); } + taosThreadMutexDestroy(&pSdb->filelock); taosMemoryFree(pSdb); mDebug("sdb is cleaned up"); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 46abd5e8b2..b2dcbd68e3 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -22,13 +22,14 @@ #define SDB_RESERVE_SIZE 512 #define SDB_FILE_VER 1 -static int32_t sdbRunDeployFp(SSdb *pSdb) { +static int32_t sdbDeployData(SSdb *pSdb) { mDebug("start to deploy sdb"); for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; + mDebug("start to deploy sdb:%s", sdbTableName(i)); if ((*fp)(pSdb->pMnode) != 0) { mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr()); return -1; @@ -39,6 +40,39 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { return 0; } +static void sdbResetData(SSdb *pSdb) { + mDebug("start to reset sdb"); + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL) continue; + + sdbFreeRow(pSdb, pRow, true); + ppRow = taosHashIterate(hash, ppRow); + } + } + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + taosHashClear(pSdb->hashObjs[i]); + pSdb->tableVer[i] = 0; + pSdb->maxId[i] = 0; + mDebug("sdb:%s is reset", sdbTableName(i)); + } + + pSdb->curVer = -1; + pSdb->curTerm = -1; + pSdb->lastCommitVer = -1; + mDebug("sdb reset successfully"); +} + static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { int64_t sver = 0; int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t)); @@ -169,11 +203,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { return 0; } -int32_t sdbReadFile(SSdb *pSdb) { +static int32_t sdbReadFileImp(SSdb *pSdb) { int64_t offset = 0; int32_t code = 0; int32_t readLen = 0; int64_t ret = 0; + char file[PATH_MAX] = {0}; + + snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); + mDebug("start to read file:%s", file); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); if (pRaw == NULL) { @@ -182,10 +220,6 @@ int32_t sdbReadFile(SSdb *pSdb) { return -1; } - char file[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to read file:%s", file); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { taosMemoryFree(pRaw); @@ -196,8 +230,6 @@ int32_t sdbReadFile(SSdb *pSdb) { if (sdbReadFileHead(pSdb, pFile) != 0) { mError("failed to read file:%s head since %s", file, terrstr()); - pSdb->curVer = -1; - pSdb->curTerm = -1; taosMemoryFree(pRaw); taosCloseFile(&pFile); return -1; @@ -264,6 +296,20 @@ _OVER: return code; } +int32_t sdbReadFile(SSdb *pSdb) { + taosThreadMutexLock(&pSdb->filelock); + + sdbResetData(pSdb); + int32_t code = sdbReadFileImp(pSdb); + if (code != 0) { + mError("failed to read sdb since %s", terrstr()); + sdbResetData(pSdb); + } + + taosThreadMutexUnlock(&pSdb->filelock); + return code; +} + static int32_t sdbWriteFileImp(SSdb *pSdb) { int32_t code = 0; @@ -378,15 +424,21 @@ int32_t sdbWriteFile(SSdb *pSdb) { return 0; } - return sdbWriteFileImp(pSdb); + taosThreadMutexLock(&pSdb->filelock); + int32_t code = sdbWriteFileImp(pSdb); + if (code != 0) { + mError("failed to write sdb since %s", terrstr()); + } + taosThreadMutexUnlock(&pSdb->filelock); + return code; } int32_t sdbDeploy(SSdb *pSdb) { - if (sdbRunDeployFp(pSdb) != 0) { + if (sdbDeployData(pSdb) != 0) { return -1; } - if (sdbWriteFileImp(pSdb) != 0) { + if (sdbWriteFile(pSdb) != 0) { return -1; } @@ -397,13 +449,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + taosThreadMutexLock(&pSdb->filelock); if (taosCopyFile(datafile, tmpfile) != 0) { + taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); return NULL; } + taosThreadMutexUnlock(&pSdb->filelock); SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); if (pIter == NULL) { @@ -422,11 +477,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { return pIter; } -static void sdbCloseIter(SSdbIter *pIter) { +static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) { if (pIter == NULL) return; if (pIter->file != NULL) { taosCloseFile(&pIter->file); } + + char tmpfile[PATH_MAX] = {0}; + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + taosRemoveFile(tmpfile); + taosMemoryFree(pIter); mInfo("sdbiter:%p, is closed", pIter); } @@ -453,15 +513,14 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { } int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { - const int32_t maxlen = 100; - SSdbIter *pIter = sdbGetIter(pSdb, ppIter); if (pIter == NULL) return -1; - char *pBuf = taosMemoryCalloc(1, maxlen); + int32_t maxlen = 100; + char *pBuf = taosMemoryCalloc(1, maxlen); if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); return -1; } @@ -472,7 +531,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return -1; } else if (readlen == 0) { @@ -480,7 +539,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return 0; } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { @@ -494,7 +553,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = pBuf; *len = readlen; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); return 0; } else { // impossible @@ -502,7 +561,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le *ppBuf = NULL; *len = 0; *ppIter = NULL; - sdbCloseIter(pIter); + sdbCloseIter(pSdb, pIter); taosMemoryFree(pBuf); return -1; } @@ -512,7 +571,7 @@ int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) { char datafile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0}; snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); + snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) {