From 3efe621526c2b94367108a0ff72e6fb128dfbdfc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 3 Jan 2022 17:44:47 +0800 Subject: [PATCH] integration with wal module --- include/dnode/mnode/sdb/sdb.h | 13 ++++++++-- source/dnode/mnode/impl/CMakeLists.txt | 1 + source/dnode/mnode/impl/inc/mndInt.h | 3 +++ source/dnode/mnode/impl/src/mndSync.c | 34 +++++++++++++++++++++++++- source/dnode/mnode/sdb/inc/sdbInt.h | 1 + source/dnode/mnode/sdb/src/sdb.c | 5 ++++ 6 files changed, 54 insertions(+), 3 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 9373e258be..fa10d46878 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -260,7 +260,7 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 * * @param pSdb The sdb object. * @param pIter The type of the table. - * @record int32_t The number of rows in the table + * @return int32_t The number of rows in the table */ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); @@ -269,10 +269,19 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); * * @param pSdb The sdb object. * @param pIter The type of the table. - * @record int32_t The max id of the table + * @return int32_t The max id of the table */ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); +/** + * @brief Update the version of sdb + * + * @param pSdb The sdb object. + * @param val The update value of the version. + * @return int32_t The current version of sdb + */ +int64_t sdbUpdateVer(SSdb *pSdb, int32_t val); + SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 98c604e520..6768651922 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -8,6 +8,7 @@ target_include_directories( target_link_libraries( mnode PRIVATE sdb + PRIVATE wal PRIVATE transport PRIVATE cjson PRIVATE sync diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 15ff65a8fc..5c8d409d90 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -17,11 +17,13 @@ #define _TD_MND_INT_H_ #include "mndDef.h" + #include "sdb.h" #include "tcache.h" #include "tep.h" #include "tqueue.h" #include "ttime.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -65,6 +67,7 @@ typedef struct { typedef struct { int32_t errCode; sem_t syncSem; + SWal *pWal; SSyncNode *pSyncNode; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6a2fca836f..d3d4ef33e9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -20,6 +20,21 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_init(&pMgmt->syncSem, 0, 0); + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); + SWalCfg cfg = {.vgId = 1, + .fsyncPeriod = 0, + .rollPeriod = -1, + .segSize = -1, + .retentionPeriod = 0, + .retentionSize = 0, + .level = TAOS_WAL_FSYNC}; + pMgmt->pWal = walOpen(path, &cfg); + if (pMgmt->pWal == NULL) { + mError("failed to open wal in %s since %s", path, terrstr()); + return -1; + } + pMgmt->state = TAOS_SYNC_STATE_LEADER; pMgmt->pSyncNode = NULL; return 0; @@ -27,7 +42,11 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_destroy(&pMgmt->syncSem); + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + pMgmt->pWal = NULL; + tsem_destroy(&pMgmt->syncSem); + } } static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { @@ -41,6 +60,19 @@ static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSync } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { + SWal *pWal = pMnode->syncMgmt.pWal; + SSdb *pSdb = pMnode->pSdb; + + int64_t ver = sdbUpdateVer(pSdb, 1); + if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { + sdbUpdateVer(pSdb, -1); + mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver); + return -1; + } + + mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver); + walFsync(pWal, true); + #if 1 return 0; #else diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 070aa56944..98c822cae8 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -59,6 +59,7 @@ typedef struct SSdb { char *tmpDir; int64_t lastCommitVer; int64_t curVer; + int64_t tableVer[SDB_MAX]; int32_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; SHashObj *hashObjs[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 97bc0ecbdb..7df5052d6e 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -159,3 +159,8 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } + +int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { + pSdb->curVer += val; + return val; +} \ No newline at end of file