From 5ac3398f3add7d99a2c5b6f82dc0cb93eee2fdf9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 21:36:31 +0800 Subject: [PATCH] Mnode intergate with wal module --- include/dnode/mnode/sdb/sdb.h | 8 ++ include/util/taoserror.h | 1 + source/dnode/mnode/impl/src/mndSync.c | 103 +++++++++++++++++++++++--- source/dnode/mnode/sdb/inc/sdbInt.h | 2 - source/libs/wal/inc/walInt.h | 2 + source/libs/wal/src/walRead.c | 2 + source/util/src/terror.c | 1 + 7 files changed, 106 insertions(+), 13 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index fa10d46878..48c8df5ba0 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -188,6 +188,14 @@ int32_t sdbDeploy(SSdb *pSdb); */ int32_t sdbReadFile(SSdb *pSdb); +/** + * @brief Write sdb file. + * + * @param pSdb The sdb object. + * @return int32_t 0 for success, -1 for failure. + */ +int32_t sdbWriteFile(SSdb *pSdb); + /** * @brief Parse and write raw data to sdb, then free the pRaw object * diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ae36ac7216..2dcc74213c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -160,6 +160,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339) #define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A) #define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B) +#define TSDB_CODE_SDB_INVALID_WAl_VER TAOS_DEF_ERROR_CODE(0, 0x033C) // mnode-dnode #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d3d4ef33e9..3f5bb77855 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndSync.h" +#include "mndTrans.h" -int32_t mndInitSync(SMnode *pMnode) { +static int32_t mndInitWal(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_init(&pMgmt->syncSem, 0, 0); char path[PATH_MAX] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); @@ -26,12 +26,95 @@ int32_t mndInitSync(SMnode *pMnode) { .fsyncPeriod = 0, .rollPeriod = -1, .segSize = -1, - .retentionPeriod = 0, - .retentionSize = 0, + .retentionPeriod = -1, + .retentionSize = -1, .level = TAOS_WAL_FSYNC}; pMgmt->pWal = walOpen(path, &cfg); - if (pMgmt->pWal == NULL) { - mError("failed to open wal in %s since %s", path, terrstr()); + if (pMgmt->pWal == NULL) return -1; + + return 0; +} + +static void mndCloseWal(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + pMgmt->pWal = NULL; + } +} + +static int32_t mndRestoreWal(SMnode *pMnode) { + SWal *pWal = pMnode->syncMgmt.pWal; + SSdb *pSdb = pMnode->pSdb; + int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); + int32_t code = -1; + + SWalReadHandle *pHandle = walOpenReadHandle(pWal); + if (pHandle == NULL) return -1; + + int64_t start = walGetFirstVer(pWal); + int64_t end = walGetLastVer(pWal); + start = MAX(lastSdbVer, start); + + for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) { + if (walReadWithHandle(pHandle, ver) < 0) { + mError("failed to read with wal handle since %s, ver:%" PRId64, terrstr(), ver); + goto WAL_RESTORE_OVER; + } + + SWalHead *pHead = pHandle->pHead; + int64_t sdbVer = sdbUpdateVer(pSdb, 0); + if (sdbVer + 1 != ver) { + terrno = TSDB_CODE_SDB_INVALID_WAl_VER; + mError("failed to write wal to sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver); + goto WAL_RESTORE_OVER; + } + + if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) { + mError("failed to write wal to sdb since %s, ver:%" PRId64, terrstr(), ver); + goto WAL_RESTORE_OVER; + } + + sdbUpdateVer(pSdb, 1); + } + + int64_t sdbVer = sdbUpdateVer(pSdb, 0); + if (sdbVer != lastSdbVer) { + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + + if (sdbVer != lastSdbVer) { + mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer); + if (sdbWriteFile(pSdb) != 0) { + goto WAL_RESTORE_OVER; + } + } + + if (walEndSnapshot(pWal) < 0) { + goto WAL_RESTORE_OVER; + } + } + + code = 0; + +WAL_RESTORE_OVER: + walCloseReadHandle(pHandle); + return 0; +} + +int32_t mndInitSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_init(&pMgmt->syncSem, 0, 0); + + if (mndInitWal(pMnode) < 0) { + mError("failed to open wal since %s", terrstr()); + return -1; + } + + if (mndRestoreWal(pMnode) < 0) { + mError("failed to restore wal since %s", terrstr()); return -1; } @@ -42,11 +125,8 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - if (pMgmt->pWal != NULL) { - walClose(pMgmt->pWal); - pMgmt->pWal = NULL; - tsem_destroy(&pMgmt->syncSem); - } + tsem_destroy(&pMgmt->syncSem); + mndCloseWal(pMnode); } static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { @@ -71,6 +151,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver); + walCommit(pWal, ver); walFsync(pWal, true); #if 1 diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 98c822cae8..25db988a0c 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -72,8 +72,6 @@ typedef struct SSdb { SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; -int32_t sdbWriteFile(SSdb *pSdb); - const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1579cad7b6..871c95193f 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -20,6 +20,8 @@ #include "tchecksum.h" #include "wal.h" +#include "taoserror.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c80fb4eed8..b5a30e4397 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -19,8 +19,10 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); if (pRead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pRead->pWal = pWal; pRead->readIdxTfd = -1; pRead->readLogTfd = -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9fa5b3198b..e821f1f803 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_WAl_VER, "Invalid wal version") // mnode-dnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists")