From a82d594a81b9efb54153921193f6b66e565a7250 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 Nov 2020 09:48:48 +0000 Subject: [PATCH] TD-2157 --- src/inc/tsync.h | 18 ++++++------- src/sync/inc/syncInt.h | 30 ++++++++++++--------- src/sync/src/syncMain.c | 59 +++++++++++++++++++++++++++++++---------- 3 files changed, 71 insertions(+), 36 deletions(-) diff --git a/src/inc/tsync.h b/src/inc/tsync.h index d57433eba9..967b254992 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -24,18 +24,18 @@ extern "C" { #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF typedef enum _TAOS_SYNC_ROLE { - TAOS_SYNC_ROLE_OFFLINE, - TAOS_SYNC_ROLE_UNSYNCED, - TAOS_SYNC_ROLE_SYNCING, - TAOS_SYNC_ROLE_SLAVE, - TAOS_SYNC_ROLE_MASTER, + TAOS_SYNC_ROLE_OFFLINE = 0, + TAOS_SYNC_ROLE_UNSYNCED = 1, + TAOS_SYNC_ROLE_SYNCING = 2, + TAOS_SYNC_ROLE_SLAVE = 3, + TAOS_SYNC_ROLE_MASTER = 4 } ESyncRole; typedef enum _TAOS_SYNC_STATUS { - TAOS_SYNC_STATUS_INIT, - TAOS_SYNC_STATUS_START, - TAOS_SYNC_STATUS_FILE, - TAOS_SYNC_STATUS_CACHE, + TAOS_SYNC_STATUS_INIT = 0, + TAOS_SYNC_STATUS_START = 1, + TAOS_SYNC_STATUS_FILE = 2, + TAOS_SYNC_STATUS_CACHE = 3 } ESyncStatus; typedef struct { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 666a90c3cb..7d846ebc80 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -27,16 +27,20 @@ extern "C" { #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} -#define TAOS_SMSG_SYNC_DATA 1 -#define TAOS_SMSG_FORWARD 2 -#define TAOS_SMSG_FORWARD_RSP 3 -#define TAOS_SMSG_SYNC_REQ 4 -#define TAOS_SMSG_SYNC_RSP 5 -#define TAOS_SMSG_SYNC_MUST 6 -#define TAOS_SMSG_STATUS 7 +typedef enum { + TAOS_SMSG_SYNC_DATA = 1, + TAOS_SMSG_FORWARD = 2, + TAOS_SMSG_FORWARD_RSP = 3, + TAOS_SMSG_SYNC_REQ = 4, + TAOS_SMSG_SYNC_RSP = 5, + TAOS_SMSG_SYNC_MUST = 6, + TAOS_SMSG_STATUS = 7 +} ESyncMsgType; #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024) +#define SYNC_FWD_TIMER 300 +#define SYNC_ROLE_TIMER 10000 #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version @@ -123,12 +127,12 @@ typedef struct SsyncPeer { int32_t nodeId; uint32_t ip; uint16_t port; + int8_t role; + int8_t sstatus; // sync status char fqdn[TSDB_FQDN_LEN]; // peer ip string char id[TSDB_EP_LEN + 32]; // peer vgId + end point - int8_t role; - int8_t sstatus; // sync status uint64_t version; - uint64_t sversion; // track the peer version in retrieve process + uint64_t sversion; // track the peer version in retrieve process int32_t syncFd; int32_t peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried @@ -138,7 +142,7 @@ typedef struct SsyncPeer { int32_t notifyFd; int32_t watchNum; int32_t *watchFd; - int8_t refCount; // reference count + int32_t refCount; // reference count struct SSyncNode *pSyncNode; } SSyncPeer; @@ -146,16 +150,16 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; int8_t replica; int8_t quorum; + int8_t selfIndex; uint32_t vgId; int64_t rid; void *ahandle; - int8_t selfIndex; SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator SSyncPeer *pMaster; - int8_t refCount; SRecvBuffer *pRecv; SSyncFwds *pSyncFwds; // saved forward info if quorum >1 void *pFwdTimer; + void *pRoleTimer; FGetFileInfo getFileInfo; FGetWalInfo getWalInfo; FWriteToCache writeToCache; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 10ed9f6c27..e7086626d6 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -59,6 +59,7 @@ static void syncAddArbitrator(SSyncNode *pNode); static void syncFreeNode(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); +static void syncMonitorNodeRole(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); @@ -79,7 +80,9 @@ typedef enum { SYNC_STATUS_SETUP_CONN, SYNC_STATUS_SETUP_CONN_RSP, SYNC_STATUS_EXCHANGE_DATA, - SYNC_STATUS_EXCHANGE_DATA_RSP + SYNC_STATUS_EXCHANGE_DATA_RSP, + SYNC_STATUS_CHECK_ROLE, + SYNC_STATUS_CHECK_ROLE_RSP } ESyncStatusType; char *statusType[] = { @@ -88,7 +91,9 @@ char *statusType[] = { "setup-conn", "setup-conn-rsp", "exchange-data", - "exchange-data-rsp" + "exchange-data-rsp", + "check-role", + "check-role-rsp" }; uint16_t syncGenTranId() { @@ -233,9 +238,16 @@ int64_t syncStart(const SSyncInfo *pInfo) { return -1; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); if (pNode->pFwdTimer == NULL) { - sError("vgId:%d, failed to allocate timer", pNode->vgId); + sError("vgId:%d, failed to allocate fwd timer", pNode->vgId); + syncStop(pNode->rid); + return -1; + } + + pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); + if (pNode->pRoleTimer == NULL) { + sError("vgId:%d, failed to allocate role timer", pNode->vgId); syncStop(pNode->rid); return -1; } @@ -262,6 +274,7 @@ void syncStop(int64_t rid) { if (tsVgIdHash) taosHashRemove(tsVgIdHash, (const char *)&pNode->vgId, sizeof(int32_t)); if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer); + if (pNode->pRoleTimer) taosTmrStop(pNode->pRoleTimer); for (int32_t i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; @@ -471,10 +484,10 @@ static void syncFreeNode(void *param) { tfree(pNode); } -void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } +void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_32(&pPeer->refCount, 1); } int32_t syncDecPeerRef(SSyncPeer *pPeer) { - if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { + if (atomic_sub_fetch_32(&pPeer->refCount, 1) == 0) { taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); @@ -699,20 +712,20 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new int8_t syncRequired = 0; pPeer->role = newPeerRole; - sTrace("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]); + sDebug("%s, peer role:%s change to %s", pPeer->id, syncRole[oldPeerRole], syncRole[newPeerRole]); SSyncPeer *pMaster = syncCheckMaster(pNode); if (pMaster) { // master is there pNode->pMaster = pMaster; - sTrace("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version); + sDebug("%s, it is the master, sver:%" PRIu64, pMaster->id, pMaster->version); if (syncValidateMaster(pPeer) < 0) return; if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeVersion < pMaster->version) { - sTrace("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion); + sDebug("%s, is master, sync required, self sver:%" PRIu64, pMaster->id, nodeVersion); syncRequired = 1; } else { sInfo("%s, is master, work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); @@ -720,7 +733,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new (*pNode->notifyRole)(pNode->ahandle, nodeRole); } } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { - sTrace("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); + sDebug("%s, is master, continue work as slave, self sver:%" PRIu64, pMaster->id, nodeVersion); } } else { // master not there, if all peer's state and version are consistent, choose the master @@ -739,10 +752,10 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new } if (consistent) { - sTrace("vgId:%d, choose master", pNode->vgId); + sDebug("vgId:%d, choose master", pNode->vgId); syncChooseMaster(pNode); } else { - sTrace("vgId:%d, version inconsistent, cannot choose master", pNode->vgId); + sDebug("vgId:%d, version inconsistent, cannot choose master", pNode->vgId); } } @@ -1221,8 +1234,26 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } } +static void syncMonitorNodeRole(void *param, void *tmrId) { + int64_t rid = (int64_t)param; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; + + for (int32_t index = 0; index < pNode->replica; index++) { + if (index == pNode->selfIndex) continue; + + SSyncPeer *pPeer = pNode->peerInfo[index]; + if (pPeer->role <= TAOS_SYNC_ROLE_UNSYNCED || nodeRole <= TAOS_SYNC_ROLE_UNSYNCED) { + syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); + } + } + + pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); + taosReleaseRef(tsSyncRefId, rid); +} + static void syncMonitorFwdInfos(void *param, void *tmrId) { - int64_t rid = (int64_t) param; + int64_t rid = (int64_t)param; SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); if (pNode == NULL) return; @@ -1246,7 +1277,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); } taosReleaseRef(tsSyncRefId, rid);