Merge pull request #19369 from taosdata/fix/TD-21353
fix(vnode/mgmt): pre close vnode after queue's empty
This commit is contained in:
commit
e34378184d
|
@ -232,6 +232,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo);
|
|||
int32_t syncStart(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
void syncPreStop(int64_t rid);
|
||||
void syncPostStop(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||
|
|
|
@ -118,6 +118,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
|
||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||
|
||||
dInfo("vgId:%d, post close", pVnode->vgId);
|
||||
vnodePostClose(pVnode->pImpl);
|
||||
|
||||
vmFreeQueue(pMgmt, pVnode);
|
||||
vnodeClose(pVnode->pImpl);
|
||||
pVnode->pImpl = NULL;
|
||||
|
|
|
@ -1050,7 +1050,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
status = "offline";
|
||||
}
|
||||
|
||||
char b1[9] = {0};
|
||||
char b1[16] = {0};
|
||||
STR_TO_VARSTR(b1, status);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, b1, false);
|
||||
|
|
|
@ -54,7 +54,8 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
|
|||
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||
void vnodePreClose(SVnode *pVnode);
|
||||
void vnodeSyncCheckTimeout(SVnode* pVnode);
|
||||
void vnodePostClose(SVnode *pVnode);
|
||||
void vnodeSyncCheckTimeout(SVnode *pVnode);
|
||||
void vnodeClose(SVnode *pVnode);
|
||||
|
||||
int32_t vnodeStart(SVnode *pVnode);
|
||||
|
@ -175,7 +176,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
|
|||
void tsdbReaderClose(STsdbReader *pReader);
|
||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
|
@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
|
|||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
uint64_t suid, void **pReader, const char* idstr);
|
||||
uint64_t suid, void **pReader, const char *idstr);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
|
|
@ -98,6 +98,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
|
|||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||
void vnodeSyncPreClose(SVnode* pVnode);
|
||||
void vnodeSyncPostClose(SVnode* pVnode);
|
||||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
|
|
|
@ -250,6 +250,8 @@ void vnodePreClose(SVnode *pVnode) {
|
|||
vnodeSyncPreClose(pVnode);
|
||||
}
|
||||
|
||||
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
|
||||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
vnodeSyncCommit(pVnode);
|
||||
|
|
|
@ -614,6 +614,11 @@ void vnodeSyncPreClose(SVnode *pVnode) {
|
|||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
}
|
||||
|
||||
void vnodeSyncPostClose(SVnode *pVnode) {
|
||||
vInfo("vgId:%d, post close sync", pVnode->config.vgId);
|
||||
syncPostStop(pVnode->sync);
|
||||
}
|
||||
|
||||
void vnodeSyncClose(SVnode *pVnode) {
|
||||
vInfo("vgId:%d, close sync", pVnode->config.vgId);
|
||||
syncStop(pVnode->sync);
|
||||
|
|
|
@ -228,6 +228,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode);
|
|||
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||
void syncNodeClose(SSyncNode* pSyncNode);
|
||||
void syncNodePreClose(SSyncNode* pSyncNode);
|
||||
void syncNodePostClose(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq);
|
||||
int32_t syncNodeRestore(SSyncNode* pSyncNode);
|
||||
void syncHbTimerDataFree(SSyncHbTimerData* pData);
|
||||
|
|
|
@ -124,6 +124,14 @@ void syncPreStop(int64_t rid) {
|
|||
}
|
||||
}
|
||||
|
||||
void syncPostStop(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
syncNodePostClose(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
}
|
||||
|
||||
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
||||
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
||||
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
||||
|
@ -1236,6 +1244,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (pSyncNode->pNewNodeReceiver != NULL) {
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
|
||||
|
@ -1246,6 +1255,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
|
||||
pSyncNode->pNewNodeReceiver = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
|
@ -1257,6 +1267,19 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
|
|||
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
|
||||
}
|
||||
|
||||
void syncNodePostClose(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->pNewNodeReceiver != NULL) {
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
|
||||
}
|
||||
|
||||
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
|
||||
pSyncNode->pNewNodeReceiver);
|
||||
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
|
||||
pSyncNode->pNewNodeReceiver = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
|
||||
|
||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||
|
|
Loading…
Reference in New Issue