diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d37f8f76c2..f93bf9a326 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -232,6 +232,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); int32_t syncStart(int64_t rid); void syncStop(int64_t rid); void syncPreStop(int64_t rid); +void syncPostStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 28797c5361..93750ed585 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -98,6 +98,7 @@ bool vnodeShouldRollback(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode); +void vnodeSyncPostClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); bool vnodeIsLeader(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 9affd534c7..cea85a0c11 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,9 +245,12 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { vnodeQueryPreClose(pVnode); } +void vnodePreClose(SVnode *pVnode) { + vnodeQueryPreClose(pVnode); + vnodeSyncPreClose(pVnode); +} -void vnodePostClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } +void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 749c81224c..65e17cfaad 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -614,6 +614,11 @@ void vnodeSyncPreClose(SVnode *pVnode) { taosThreadMutexUnlock(&pVnode->lock); } +void vnodeSyncPostClose(SVnode *pVnode) { + vInfo("vgId:%d, post close sync", pVnode->config.vgId); + syncPostStop(pVnode->sync); +} + void vnodeSyncClose(SVnode *pVnode) { vInfo("vgId:%d, close sync", pVnode->config.vgId); syncStop(pVnode->sync); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3bf4a8d1cd..6793430923 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -228,6 +228,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); +void syncNodePostClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 32ae9e391c..5b9fb7c59c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -124,6 +124,14 @@ void syncPreStop(int64_t rid) { } } +void syncPostStop(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode != NULL) { + syncNodePostClose(pSyncNode); + syncNodeRelease(pSyncNode); + } +} + static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { if (!syncNodeInConfig(pSyncNode, pCfg)) return false; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; @@ -1236,6 +1244,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { } } +#if 0 if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); @@ -1246,6 +1255,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); pSyncNode->pNewNodeReceiver = NULL; } +#endif // stop elect timer syncNodeStopElectTimer(pSyncNode); @@ -1257,6 +1267,19 @@ void syncNodePreClose(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } +void syncNodePostClose(SSyncNode* pSyncNode) { + if (pSyncNode->pNewNodeReceiver != NULL) { + if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { + snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + } + + sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, + pSyncNode->pNewNodeReceiver); + snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); + pSyncNode->pNewNodeReceiver = NULL; + } +} + void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncNodeClose(SSyncNode* pSyncNode) {