TD-2211
This commit is contained in:
parent
99815378bf
commit
4d789b0871
|
@ -28,13 +28,14 @@ extern "C" {
|
||||||
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
|
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SMSG_SYNC_DATA = 1,
|
TAOS_SMSG_SYNC_DATA = 1,
|
||||||
TAOS_SMSG_FORWARD = 2,
|
TAOS_SMSG_FORWARD = 2,
|
||||||
TAOS_SMSG_FORWARD_RSP = 3,
|
TAOS_SMSG_FORWARD_RSP = 3,
|
||||||
TAOS_SMSG_SYNC_REQ = 4,
|
TAOS_SMSG_SYNC_REQ = 4,
|
||||||
TAOS_SMSG_SYNC_RSP = 5,
|
TAOS_SMSG_SYNC_RSP = 5,
|
||||||
TAOS_SMSG_SYNC_MUST = 6,
|
TAOS_SMSG_SYNC_MUST = 6,
|
||||||
TAOS_SMSG_STATUS = 7
|
TAOS_SMSG_STATUS = 7,
|
||||||
|
TAOS_SMSG_SYNC_DATA_RSP = 8,
|
||||||
} ESyncMsgType;
|
} ESyncMsgType;
|
||||||
|
|
||||||
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
|
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
|
||||||
|
@ -65,6 +66,10 @@ typedef struct {
|
||||||
int32_t sourceId; // only for arbitrator
|
int32_t sourceId; // only for arbitrator
|
||||||
} SFirstPkt;
|
} SFirstPkt;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t sync;
|
||||||
|
} SFirstPktRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t role;
|
int8_t role;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
|
|
|
@ -1313,6 +1313,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
}
|
}
|
||||||
|
|
||||||
// always update version
|
// always update version
|
||||||
|
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
|
||||||
|
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
|
||||||
nodeVersion = pWalHead->version;
|
nodeVersion = pWalHead->version;
|
||||||
|
|
||||||
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
||||||
|
@ -1320,10 +1322,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
// only pkt from RPC or CQ can be forwarded
|
// only pkt from RPC or CQ can be forwarded
|
||||||
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
|
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
|
||||||
|
|
||||||
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
|
// a hacker way to improve the performance
|
||||||
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
|
|
||||||
|
|
||||||
// a hacker way to improve the performance
|
|
||||||
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
|
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
|
||||||
pSyncHead->type = TAOS_SMSG_FORWARD;
|
pSyncHead->type = TAOS_SMSG_FORWARD;
|
||||||
pSyncHead->pversion = 0;
|
pSyncHead->pversion = 0;
|
||||||
|
@ -1344,9 +1343,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
|
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
|
||||||
if (retLen == fwdLen) {
|
if (retLen == fwdLen) {
|
||||||
sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
|
sTrace("%s, forward is sent, role:%s sstatus:%s hver:%" PRIu64 " contLen:%d", pPeer->id, pPeer->role,
|
||||||
|
syncStatus[pPeer->sstatus], pWalHead->version, pWalHead->len);
|
||||||
} else {
|
} else {
|
||||||
sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
|
sError("%s, failed to forward, role:%s sstatus:%s hver:%" PRIu64 " retLen:%d", pPeer->id, pPeer->role,
|
||||||
|
syncStatus[pPeer->sstatus], pWalHead->version, retLen);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,10 +231,13 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
|
||||||
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
|
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode * pNode = pPeer->pSyncNode;
|
||||||
SRecvBuffer *pRecv = pNode->pRecv;
|
SRecvBuffer *pRecv = pNode->pRecv;
|
||||||
|
|
||||||
if (pRecv == NULL) return -1;
|
|
||||||
int32_t len = pHead->len + sizeof(SWalHead);
|
int32_t len = pHead->len + sizeof(SWalHead);
|
||||||
|
|
||||||
|
if (pRecv == NULL) {
|
||||||
|
sError("%s, recv buffer is not create yet", pPeer->id);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
|
if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
|
||||||
memcpy(pRecv->offset, pHead, len);
|
memcpy(pRecv->offset, pHead, len);
|
||||||
pRecv->offset += len;
|
pRecv->offset += len;
|
||||||
|
@ -282,7 +285,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_FILE;
|
nodeSStatus = TAOS_SYNC_STATUS_FILE;
|
||||||
uint64_t fversion = 0;
|
uint64_t fversion = 0;
|
||||||
|
|
||||||
sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
|
SFirstPktRsp firstPktRsp = {.sync = 1};
|
||||||
|
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
|
||||||
|
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
int32_t code = syncRestoreFile(pPeer, &fversion);
|
int32_t code = syncRestoreFile(pPeer, &fversion);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sError("%s, failed to restore file", pPeer->id);
|
sError("%s, failed to restore file", pPeer->id);
|
||||||
|
@ -299,14 +309,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
||||||
|
|
||||||
nodeVersion = fversion;
|
nodeVersion = fversion;
|
||||||
|
|
||||||
sDebug("%s, start to restore wal", pPeer->id);
|
sInfo("%s, start to restore wal", pPeer->id);
|
||||||
if (syncRestoreWal(pPeer) < 0) {
|
if (syncRestoreWal(pPeer) < 0) {
|
||||||
sError("%s, failed to restore wal", pPeer->id);
|
sError("%s, failed to restore wal", pPeer->id);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_CACHE;
|
nodeSStatus = TAOS_SYNC_STATUS_CACHE;
|
||||||
sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
|
||||||
if (syncProcessBufferedFwd(pPeer) < 0) {
|
if (syncProcessBufferedFwd(pPeer) < 0) {
|
||||||
sError("%s, failed to insert buffered points", pPeer->id);
|
sError("%s, failed to insert buffered points", pPeer->id);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -448,7 +448,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
|
|
||||||
SFirstPkt firstPkt;
|
SFirstPkt firstPkt;
|
||||||
|
@ -458,8 +458,24 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
|
|
||||||
if (taosWriteMsg(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) {
|
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) {
|
||||||
sError("%s, failed to send syncCmd", pPeer->id);
|
sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SFirstPktRsp firstPktRsp;
|
||||||
|
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
|
||||||
|
sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
||||||
|
sInfo("%s, start to retrieve, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
||||||
|
if (syncRetrieveFirstPkt(pPeer) < 0) {
|
||||||
|
sError("%s, failed to start retrieve", pPeer->id);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -203,16 +203,19 @@ static void *taosProcessTcpData(void *param) {
|
||||||
assert(pConn);
|
assert(pConn);
|
||||||
|
|
||||||
if (events[i].events & EPOLLERR) {
|
if (events[i].events & EPOLLERR) {
|
||||||
|
sDebug("conn is broken since EPOLLERR");
|
||||||
taosProcessBrokenLink(pConn);
|
taosProcessBrokenLink(pConn);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLHUP) {
|
if (events[i].events & EPOLLHUP) {
|
||||||
|
sDebug("conn is broken since EPOLLHUP");
|
||||||
taosProcessBrokenLink(pConn);
|
taosProcessBrokenLink(pConn);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLRDHUP) {
|
if (events[i].events & EPOLLRDHUP) {
|
||||||
|
sDebug("conn is broken since EPOLLRDHUP");
|
||||||
taosProcessBrokenLink(pConn);
|
taosProcessBrokenLink(pConn);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue