From e8d9017dc765c8139921d0dd9fa0ad04d1c1c09c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Dec 2020 17:11:02 +0800 Subject: [PATCH] TD-2428 --- src/sync/inc/syncInt.h | 22 +++++---- src/sync/src/syncArbitrator.c | 16 +++---- src/sync/src/syncMain.c | 87 ++++++++++++++++++----------------- src/sync/src/syncRestore.c | 8 ++-- src/sync/src/syncRetrieve.c | 30 ++++++------ src/sync/test/syncServer.c | 4 +- 6 files changed, 87 insertions(+), 80 deletions(-) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 2be25447c4..c00015552f 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -43,6 +43,7 @@ typedef enum { #define SYNC_FWD_TIMER 300 #define SYNC_ROLE_TIMER 10000 #define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 +#define SYNC_PROTOCOL_VERSION 0 #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version @@ -51,27 +52,27 @@ typedef enum { #pragma pack(push, 1) typedef struct { - char type; // msg type - char pversion; // protocol version - char reserved[6]; // not used + int8_t type; // msg type + int8_t protocol; // protocol version + int8_t reserved[6]; // not used int32_t vgId; // vg ID int32_t len; // content length, does not include head - // char cont[]; // message content starts from here } SSyncHead; typedef struct { - SSyncHead syncHead; + SSyncHead head; uint16_t port; uint16_t tranId; char fqdn[TSDB_FQDN_LEN]; int32_t sourceId; // only for arbitrator -} SFirstPkt; +} SSyncMsg; typedef struct { - int8_t sync; - int8_t reserved; - uint16_t tranId; -} SFirstPktRsp; + SSyncHead head; + int8_t sync; + int8_t reserved; + uint16_t tranId; +} SSyncRsp; typedef struct { int8_t role; @@ -101,6 +102,7 @@ typedef struct { } SFileAck; typedef struct { + SSyncHead head; uint64_t version; int32_t code; } SFwdRsp; diff --git a/src/sync/src/syncArbitrator.c b/src/sync/src/syncArbitrator.c index 971fac26aa..1cb2b8f302 100644 --- a/src/sync/src/syncArbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); - SFirstPkt firstPkt; - if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); + SSyncMsg msg; + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } @@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } - firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; - snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); - if (firstPkt.syncHead.vgId) { + msg.fqdn[TSDB_FQDN_LEN - 1] = 0; + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port); + if (msg.head.vgId) { sDebug("%s, vgId in head is not zero, close the connection", pNode->id); tfree(pNode); taosCloseSocket(connFd); @@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) { int32_t bytes = 0; char * cont = (char *)buffer; - int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); - if (hlen != sizeof(head)) { + int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead)); + if (hlen != sizeof(SSyncHead)) { sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); return -1; } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 847dbc2f5d..5e3355e68f 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -384,18 +384,16 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { SSyncPeer *pPeer = pNode->pMaster; if (pPeer && pNode->quorum > 1) { - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SFwdRsp fwdRsp = {0}; - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); + fwdRsp.head.type = TAOS_SMSG_FORWARD_RSP; + fwdRsp.head.protocol = SYNC_PROTOCOL_VERSION; + fwdRsp.head.vgId = pNode->vgId; + fwdRsp.head.len = sizeof(SFwdRsp) - sizeof(SSyncHead); + fwdRsp.version = version; + fwdRsp.code = code; - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; - - int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) { + if (taosWriteMsg(pPeer->peerFd, &fwdRsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) { sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); } else { sDebug("%s, failed to send forward ack, restart", pPeer->id); @@ -865,20 +863,22 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { sDebug("%s, try to sync", pPeer->id); - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; - firstPkt.syncHead.vgId = pNode->vgId; - firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); - firstPkt.tranId = syncGenTranId(); - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; + SSyncMsg msg; + memset(&msg, 0, sizeof(SSyncMsg)); + msg.head.type = TAOS_SMSG_SYNC_REQ; + msg.head.protocol = SYNC_PROTOCOL_VERSION; + msg.head.vgId = pNode->vgId; + msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead); + msg.port = tsSyncPort; + msg.tranId = syncGenTranId(); + tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN); + taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); - if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { - sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); + sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]); } } @@ -958,7 +958,7 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { // head.len = htonl(head.len); if (pHead->len < 0) { - sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len); + sError("%s, invalid msg length, hlen:%d", pPeer->id, pHead->len); return -1; } @@ -1052,17 +1052,19 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { return; } - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0; - firstPkt.syncHead.type = TAOS_SMSG_STATUS; - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; - firstPkt.tranId = syncGenTranId(); - firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId + SSyncMsg msg; + memset(&msg, 0, sizeof(SSyncMsg)); + msg.head.type = TAOS_SMSG_STATUS; + msg.head.protocol = SYNC_PROTOCOL_VERSION; + msg.head.vgId = pPeer->nodeId ? pNode->vgId : 0; + msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead); + msg.port = tsSyncPort; + msg.tranId = syncGenTranId(); + msg.sourceId = pNode->vgId; // tell arbitrator its vgId + tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN); - if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { - sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); + if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) { + sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); @@ -1116,14 +1118,14 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); - SFirstPkt firstPkt; - if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); + SSyncMsg msg; + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } - int32_t vgId = firstPkt.syncHead.vgId; + int32_t vgId = msg.head.vgId; SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { sError("vgId:%d, vgId could not be found", vgId); @@ -1131,7 +1133,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } - sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); + sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; pthread_mutex_lock(&pNode->mutex); @@ -1139,20 +1141,20 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; + if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break; } pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; if (pPeer == NULL) { - sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port); + sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port); taosCloseSocket(connFd); // syncSendVpeerCfgMsg(sync); } else { // first packet tells what kind of link - if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) { + if (msg.head.type == TAOS_SMSG_SYNC_DATA) { pPeer->syncFd = connFd; nodeSStatus = TAOS_SYNC_STATUS_START; - sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, + sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]); syncCreateRestoreDataThread(pPeer); } else { @@ -1334,13 +1336,14 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - // only pkt from RPC or CQ can be forwarded + // only msg from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; // a hacker way to improve the performance pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; + pSyncHead->protocol = SYNC_PROTOCOL_VERSION; + pSyncHead->vgId = pNode->vgId; pSyncHead->len = sizeof(SWalHead) + pWalHead->len; fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 4247468bca..850a9b78b7 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -289,12 +289,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { uint64_t fversion = 0; sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; - if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { - sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()}; + if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) { + sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno)); return -1; } - sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); + sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 82e3700c7a..981716d561 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -405,27 +405,29 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; - SFirstPkt firstPkt; - memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; - firstPkt.syncHead.vgId = pNode->vgId; - firstPkt.tranId = syncGenTranId(); - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); - firstPkt.port = tsSyncPort; + SSyncMsg msg; + memset(&msg, 0, sizeof(SSyncMsg)); + msg.head.type = TAOS_SMSG_SYNC_DATA; + msg.head.protocol = SYNC_PROTOCOL_VERSION; + msg.head.vgId = pNode->vgId; + msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead); + msg.port = tsSyncPort; + msg.tranId = syncGenTranId(); + tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN); - if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); + if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId); return -1; } - sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); + sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId); - SFirstPktRsp firstPktRsp; - if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { - sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); + SSyncRsp rsp; + if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) { + sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId); return -1; } - sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); + sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId); return 0; } diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 5a64a0a36a..161105d86c 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -100,7 +100,7 @@ int processRpcMsg(void *item) { pHead->msgType = pMsg->msgType; pHead->len = pMsg->contLen; - uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); + uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version); writeIntoWal(pHead); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); @@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) { int writeToCache(int32_t vgId, void *data, int type) { SWalHead *pHead = data; - uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); + uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); int msgSize = pHead->len + sizeof(SWalHead); void *pMsg = taosAllocateQitem(msgSize);