[TD-5512]<fix>: fix arbitrator offline caused by vnode destroying

This commit is contained in:
Minglei Jin 2021-07-23 18:43:42 +08:00
parent 9c23c91870
commit fa75676613
4 changed files with 10 additions and 9 deletions

View File

@ -25,7 +25,7 @@ typedef struct {
uint32_t serverIp;
int16_t port;
int32_t bufferSize;
void (*processBrokenLink)(int64_t handleId);
void (*processBrokenLink)(int64_t handleId, int32_t closedByApp);
int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(SOCKET fd, uint32_t ip);
} SPoolInfo;

View File

@ -30,7 +30,7 @@
extern void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd);
static void arbSignalHandler(int32_t signum, void *sigInfo, void *context);
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(int64_t rid);
static void arbProcessBrokenLink(int64_t rid, int32_t closedByApp);
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem;
static void * tsArbTcpPool;
@ -147,10 +147,10 @@ static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
return;
}
static void arbProcessBrokenLink(int64_t rid) {
static void arbProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SNodeConn *pNode = (SNodeConn *)rid;
sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno));
sDebug("%s, TCP link is broken since %s, closedByApp:%d", pNode->id, strerror(errno), closedByApp);
tfree(pNode);
}

View File

@ -43,7 +43,7 @@ static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
static void syncRecoverFromMaster(SSyncPeer *pPeer);
static void syncCheckPeerConnection(void *param, void *tmrId);
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(int64_t rid);
static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp);
static int32_t syncProcessPeerMsg(int64_t rid, void *buffer);
static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer);
@ -1308,7 +1308,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
pthread_mutex_unlock(&pNode->mutex);
}
static void syncProcessBrokenLink(int64_t rid) {
static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return;
@ -1316,9 +1316,10 @@ static void syncProcessBrokenLink(int64_t rid) {
pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd);
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d",
pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp);
pPeer->peerFd = -1;
if (pPeer->isArb) {
if (!closedByApp && pPeer->isArb) {
tsArbOnline = 0;
}

View File

@ -177,7 +177,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
SPoolInfo * pInfo = &pPool->info;
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
(*pInfo->processBrokenLink)(pConn->handleId);
(*pInfo->processBrokenLink)(pConn->handleId, pConn->closedByApp);
pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);