From 102a08f44be0445e2ea34c3485dd532250765aae Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 20:26:12 +0800 Subject: [PATCH 1/3] enh(sync) sync/mnode integration, add log --- source/libs/sync/src/syncAppendEntries.c | 1 + source/libs/sync/src/syncCommit.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ef66d580fb..5f90325f65 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -368,6 +368,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->restoreFinish = true; tsem_post(&ths->restoreSem); + sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post"); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index aa4bcb4fae..f5486cd1f0 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -145,6 +145,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->restoreFinish = true; tsem_post(&pSyncNode->restoreSem); + sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post"); } } From 1d7bdbc351cf788bf445eb28f0ebc3616061763e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 21:37:13 +0800 Subject: [PATCH 2/3] fix(sync) sync/mnode integration dead lock --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 5 ++++- source/libs/sync/src/syncCommit.c | 5 ++++- source/libs/sync/src/syncMain.c | 27 ++++++++++++++++++++++-- source/libs/transport/inc/transComm.h | 2 +- 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d3345620e9..9246041b81 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -149,7 +149,7 @@ typedef struct SSyncNode { // restore state bool restoreFinish; - sem_t restoreSem; + //sem_t restoreSem; SSnapshot* pSnapshot; } SSyncNode; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 5f90325f65..fa735e71c0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -366,9 +366,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pFsm->FpRestoreFinish(ths->pFsm); } ths->restoreFinish = true; + sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); + /* tsem_post(&ths->restoreSem); - sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post"); + sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths); + */ } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f5486cd1f0..18c6f8930a 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -143,9 +143,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; + sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); + /* tsem_post(&pSyncNode->restoreSem); - sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post"); + sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode); + */ } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bffebc1693..821ed364e8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -499,7 +499,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); } - tsem_init(&(pSyncNode->restoreSem), 0, 0); + //tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart // start raft @@ -521,7 +521,19 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + /* + sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + + sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId); return; } @@ -532,7 +544,18 @@ void syncNodeStart(SSyncNode* pSyncNode) { // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + /* + sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -573,7 +596,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pSnapshot); } - tsem_destroy(&pSyncNode->restoreSem); + //tsem_destroy(&pSyncNode->restoreSem); // free memory in syncFreeNode // taosMemoryFree(pSyncNode); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e71e19edce..30f799f39e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,7 +94,7 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit +#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_CONN_TIMEOUT 3 // connect timeout From 43f2a51e4b01033182b5dc3b9ff0940fdb390d16 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 22:08:14 +0800 Subject: [PATCH 3/3] fix(sync) sync/mnode eq msg --- source/dnode/mnode/impl/src/mndSync.c | 8 +++++++- source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b7cae58b12..63809a3d76 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,13 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 538d3f7f87..882ee912cd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }