From c28ba32c93785938cb866503efab20480b573c8f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 4 Jan 2023 18:08:33 +0800 Subject: [PATCH 1/3] fix(vnode/mgmt): pre close vnode after queue's empty --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 6 +++--- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index f808c67ef6..9e91670d33 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -84,9 +84,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); - dInfo("vgId:%d, pre close", pVnode->vgId); - vnodePreClose(pVnode->pImpl); - dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId); while (pVnode->refCount > 0) taosMsleep(10); @@ -118,6 +115,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); + dInfo("vgId:%d, pre close", pVnode->vgId); + vnodePreClose(pVnode->pImpl); + vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d7b16c2c8e..c7a416d444 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1050,7 +1050,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB status = "offline"; } - char b1[9] = {0}; + char b1[16] = {0}; STR_TO_VARSTR(b1, status); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, b1, false); From 0dd9e4bdeec44fbc18aa8530290bbfbf656097a0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 5 Jan 2023 11:35:39 +0800 Subject: [PATCH 2/3] fix(vnode): new vnodePostClose for closing sync module --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 7 +++++-- source/dnode/vnode/inc/vnode.h | 7 ++++--- source/dnode/vnode/src/vnd/vnodeOpen.c | 7 +++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 9e91670d33..6b274261e5 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -84,6 +84,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); + dInfo("vgId:%d, pre close", pVnode->vgId); + vnodePreClose(pVnode->pImpl); + dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId); while (pVnode->refCount > 0) taosMsleep(10); @@ -115,8 +118,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); - dInfo("vgId:%d, pre close", pVnode->vgId); - vnodePreClose(pVnode->pImpl); + dInfo("vgId:%d, post close", pVnode->vgId); + vnodePostClose(pVnode->pImpl); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index cbee70ad03..e92daaabe8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,7 +54,8 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); -void vnodeSyncCheckTimeout(SVnode* pVnode); +void vnodePostClose(SVnode *pVnode); +void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); @@ -175,7 +176,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); -int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave); +int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); @@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader, const char* idstr); + uint64_t suid, void **pReader, const char *idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index bec5d2977b..9affd534c7 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,10 +245,9 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { - vnodeQueryPreClose(pVnode); - vnodeSyncPreClose(pVnode); -} +void vnodePreClose(SVnode *pVnode) { vnodeQueryPreClose(pVnode); } + +void vnodePostClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { From 7e5d97f0daa69fdef1805bd74d153fb4c99ea9a0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 5 Jan 2023 12:00:27 +0800 Subject: [PATCH 3/3] fix: stop snapshot receiver on sync post close --- include/libs/sync/sync.h | 1 + source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 7 +++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 5 +++++ source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 23 +++++++++++++++++++++++ 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d37f8f76c2..f93bf9a326 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -232,6 +232,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); int32_t syncStart(int64_t rid); void syncStop(int64_t rid); void syncPreStop(int64_t rid); +void syncPostStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 28797c5361..93750ed585 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -98,6 +98,7 @@ bool vnodeShouldRollback(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode); +void vnodeSyncPostClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); bool vnodeIsLeader(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 9affd534c7..cea85a0c11 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,9 +245,12 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { vnodeQueryPreClose(pVnode); } +void vnodePreClose(SVnode *pVnode) { + vnodeQueryPreClose(pVnode); + vnodeSyncPreClose(pVnode); +} -void vnodePostClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } +void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 749c81224c..65e17cfaad 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -614,6 +614,11 @@ void vnodeSyncPreClose(SVnode *pVnode) { taosThreadMutexUnlock(&pVnode->lock); } +void vnodeSyncPostClose(SVnode *pVnode) { + vInfo("vgId:%d, post close sync", pVnode->config.vgId); + syncPostStop(pVnode->sync); +} + void vnodeSyncClose(SVnode *pVnode) { vInfo("vgId:%d, close sync", pVnode->config.vgId); syncStop(pVnode->sync); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3bf4a8d1cd..6793430923 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -228,6 +228,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); +void syncNodePostClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 32ae9e391c..5b9fb7c59c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -124,6 +124,14 @@ void syncPreStop(int64_t rid) { } } +void syncPostStop(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode != NULL) { + syncNodePostClose(pSyncNode); + syncNodeRelease(pSyncNode); + } +} + static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { if (!syncNodeInConfig(pSyncNode, pCfg)) return false; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; @@ -1236,6 +1244,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { } } +#if 0 if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); @@ -1246,6 +1255,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); pSyncNode->pNewNodeReceiver = NULL; } +#endif // stop elect timer syncNodeStopElectTimer(pSyncNode); @@ -1257,6 +1267,19 @@ void syncNodePreClose(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } +void syncNodePostClose(SSyncNode* pSyncNode) { + if (pSyncNode->pNewNodeReceiver != NULL) { + if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { + snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + } + + sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, + pSyncNode->pNewNodeReceiver); + snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); + pSyncNode->pNewNodeReceiver = NULL; + } +} + void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncNodeClose(SSyncNode* pSyncNode) {