From f7ad45fe32d463e2ec8e786c60976e5ced7e98e8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 29 Oct 2020 08:58:26 +0000 Subject: [PATCH 1/3] race condition for timer and forward --- src/sync/src/syncMain.c | 250 ++++++++++++++++++++++++---------------- 1 file changed, 148 insertions(+), 102 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef635e6efc..17815c93aa 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -20,6 +20,7 @@ #include "tlog.h" #include "tutil.h" #include "ttimer.h" +#include "tref.h" #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" @@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN]; static ttpool_h tsTcpPool; static void * syncTmrCtrl = NULL; static void * vgIdHash; +static int tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); -static void syncAddNodeRef(SSyncNode *pNode); -static void syncDecNodeRef(SSyncNode *pNode); +static void syncFreeNode(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); char* syncRole[] = { @@ -106,6 +108,12 @@ int32_t syncInit() { return -1; } + tsSyncRefId = taosOpenRef(200, syncFreeNode); + if (tsSyncRefId < 0) { + syncCleanUp(); + return -1; + } + tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); sInfo("sync module initialized successfully"); @@ -128,6 +136,8 @@ void syncCleanUp() { vgIdHash = NULL; } + taosCloseRef(tsSyncRefId); + sInfo("sync module is cleaned up"); } @@ -159,6 +169,12 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; + int ret = taosAddRef(tsSyncRefId, pNode); + if (ret < 0) { + syncFreeNode(pNode); + return NULL; + } + for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); @@ -167,8 +183,6 @@ void *syncStart(const SSyncInfo *pInfo) { } } - syncAddNodeRef(pNode); - if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; @@ -210,7 +224,9 @@ void syncStop(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - if (pNode == NULL) return; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + sInfo("vgId:%d, cleanup sync", pNode->vgId); pthread_mutex_lock(&(pNode->mutex)); @@ -228,14 +244,17 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); + taosRemoveRef(tsSyncRefId, pNode); } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { SSyncNode *pNode = param; int i, j; - if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -298,105 +317,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); + taosReleaseRef(tsSyncRefId, pNode); + return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { SSyncNode *pNode = param; - SSyncPeer *pPeer; - SSyncHead *pSyncHead; - SWalHead * pWalHead = data; - int fwdLen; - int code = 0; - if (pNode == NULL) return 0; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); - } - return TSDB_CODE_SYN_INVALID_VERSION; - } + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - // always update version - nodeVersion = pWalHead->version; - sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], - qtype, pWalHead->version); - - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - - // only pkt from RPC or CQ can be forwarded - if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; - - // a hacker way to improve the performance - pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); - pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - - pthread_mutex_lock(&(pNode->mutex)); - - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd < 0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - - if (pNode->quorum > 1 && code == 0) { - syncSaveFwdInfo(pNode, pWalHead->version, mhandle); - code = 1; - } - - int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); - if (retLen == fwdLen) { - sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); - } else { - sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); - syncRestartConnection(pPeer); - } - } - - pthread_mutex_unlock(&(pNode->mutex)); + taosReleaseRef(tsSyncRefId, pNode); return code; } void syncConfirmForward(void *param, uint64_t version, int32_t code) { SSyncNode *pNode = param; - if (pNode == NULL) return; - if (pNode->quorum <= 1) return; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; SSyncPeer *pPeer = pNode->pMaster; - if (pPeer == NULL) return; + if (pPeer && pNode->quorum > 1) { + char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SSyncHead *pHead = (SSyncHead *)msg; + pHead->type = TAOS_SMSG_FORWARD_RSP; + pHead->len = sizeof(SFwdRsp); - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); + SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); + pFwdRsp->version = version; + pFwdRsp->code = code; - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; + int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int retLen = write(pPeer->peerFd, msg, msgLen); - int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int retLen = write(pPeer->peerFd, msg, msgLen); - - if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); - } else { - sDebug("%s, failed to send forward ack, restart", pPeer->id); - syncRestartConnection(pPeer); + if (retLen == msgLen) { + sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + } else { + sDebug("%s, failed to send forward ack, restart", pPeer->id); + syncRestartConnection(pPeer); + } } + + taosReleaseRef(tsSyncRefId, pNode); } void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -414,17 +391,24 @@ void syncRecover(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); + + taosReleaseRef(tsSyncRefId, pNode); } int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return -1; + pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } + taosReleaseRef(tsSyncRefId, pNode); + return 0; } @@ -457,22 +441,20 @@ static void syncAddArbitrator(SSyncNode *pNode) { pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } +static void syncFreeNode(void *param) { + SSyncNode *pNode = param; -static void syncDecNodeRef(SSyncNode *pNode) { - if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { - pthread_mutex_destroy(&pNode->mutex); - taosTFree(pNode->pRecv); - taosTFree(pNode->pSyncFwds); - taosTFree(pNode); - } + pthread_mutex_destroy(&pNode->mutex); + taosTFree(pNode->pRecv); + taosTFree(pNode->pSyncFwds); + taosTFree(pNode); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - syncDecNodeRef(pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -529,7 +511,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - syncAddNodeRef(pNode); + taosAcquireRef(tsSyncRefId, pNode); return pPeer; } @@ -1122,7 +1104,6 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - syncAddNodeRef(pNode); pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1133,7 +1114,6 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1202,22 +1182,88 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if ( ret < 0) return; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; - if (pSyncFwds == NULL) return; - uint64_t time = taosGetTimestampMs(); + if (pSyncFwds) {; + uint64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; - if (time - pFwdInfo->time < 2000) break; - syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (pSyncFwds->fwds > 0) { + pthread_mutex_lock(&(pNode->mutex)); + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + if (time - pFwdInfo->time < 2000) break; + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + syncRemoveConfirmedFwdInfo(pNode); + pthread_mutex_unlock(&(pNode->mutex)); } - syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&(pNode->mutex)); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + } +} + +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int32_t code = 0; + + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + // always update version + nodeVersion = pWalHead->version; + sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], + qtype, pWalHead->version); + + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + + // only pkt from RPC or CQ can be forwarded + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + + // a hacker way to improve the performance + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead->type = TAOS_SMSG_FORWARD; + pSyncHead->pversion = 0; + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + + if (pNode->quorum > 1 && code == 0) { + syncSaveFwdInfo(pNode, pWalHead->version, mhandle); + code = 1; + } + + int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + if (retLen == fwdLen) { + sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + } else { + sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; } + From 98a7e88c45325c911ee28fb396403a5839cace09 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 29 Oct 2020 09:14:42 +0000 Subject: [PATCH 2/3] releaseRef --- src/sync/src/syncMain.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 17815c93aa..a538b4db87 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -137,6 +137,7 @@ void syncCleanUp() { } taosCloseRef(tsSyncRefId); + tsSyncRefId = -1; sInfo("sync module is cleaned up"); } @@ -1205,6 +1206,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } + + taosReleaseRef(tsSyncRefId, pNode); } static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { From 5d453ae329762e43f6d023b77a99bc198c83d2e8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 30 Oct 2020 00:03:36 +0000 Subject: [PATCH 3/3] add acquire/release for brokenlink processing --- src/sync/src/syncMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index a538b4db87..6f5e3be8ab 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1105,6 +1105,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; + if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1115,6 +1116,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); + taosReleaseRef(tsSyncRefId, pNode); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {