add debug info for sync module
This commit is contained in:
parent
6f004e4c7a
commit
53774be0ca
|
@ -65,6 +65,9 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int8_t ack;
|
int8_t ack;
|
||||||
|
int8_t type;
|
||||||
|
int8_t reserved[3];
|
||||||
|
uint16_t tranId;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
SPeerStatus peersStatus[];
|
SPeerStatus peersStatus[];
|
||||||
} SPeersStatus;
|
} SPeersStatus;
|
||||||
|
|
|
@ -48,7 +48,7 @@ static void * vgIdHash;
|
||||||
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
|
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
|
||||||
static void syncRecoverFromMaster(SSyncPeer *pPeer);
|
static void syncRecoverFromMaster(SSyncPeer *pPeer);
|
||||||
static void syncCheckPeerConnection(void *param, void *tmrId);
|
static void syncCheckPeerConnection(void *param, void *tmrId);
|
||||||
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack);
|
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
|
||||||
static void syncProcessBrokenLink(void *param);
|
static void syncProcessBrokenLink(void *param);
|
||||||
static int syncProcessPeerMsg(void *param, void *buffer);
|
static int syncProcessPeerMsg(void *param, void *buffer);
|
||||||
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
||||||
|
@ -71,6 +71,28 @@ char* syncRole[] = {
|
||||||
"master"
|
"master"
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
SYNC_STATUS_BROADCAST,
|
||||||
|
SYNC_STATUS_BROADCAST_RSP,
|
||||||
|
SYNC_STATUS_SETUP_CONN,
|
||||||
|
SYNC_STATUS_SETUP_CONN_RSP,
|
||||||
|
SYNC_STATUS_EXCHANGE_DATA,
|
||||||
|
SYNC_STATUS_EXCHANGE_DATA_RSP
|
||||||
|
} ESyncStatusType;
|
||||||
|
|
||||||
|
char *statusType[] = {
|
||||||
|
"broadcast",
|
||||||
|
"broadcast-rsp",
|
||||||
|
"setup-conn",
|
||||||
|
"setup-conn-rsp",
|
||||||
|
"exchange-data",
|
||||||
|
"exchange-data-rsp"
|
||||||
|
};
|
||||||
|
|
||||||
|
uint16_t syncGenTranId() {
|
||||||
|
return taosRand() & 0XFFFF;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncInit() {
|
int32_t syncInit() {
|
||||||
SPoolInfo info;
|
SPoolInfo info;
|
||||||
|
|
||||||
|
@ -539,7 +561,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
if (i == pNode->selfIndex) continue;
|
if (i == pNode->selfIndex) continue;
|
||||||
pPeer = pNode->peerInfo[i];
|
pPeer = pNode->peerInfo[i];
|
||||||
syncSendPeersStatusMsgToPeer(pPeer, 1);
|
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -894,14 +916,14 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode * pNode = pPeer->pSyncNode;
|
||||||
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
||||||
|
|
||||||
sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
|
sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id,
|
||||||
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
|
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
|
||||||
|
|
||||||
pPeer->version = pPeersStatus->version;
|
pPeer->version = pPeersStatus->version;
|
||||||
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
|
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
|
||||||
|
|
||||||
if (pPeersStatus->ack) {
|
if (pPeersStatus->ack) {
|
||||||
syncSendPeersStatusMsgToPeer(pPeer, 0);
|
syncSendPeersStatusMsgToPeer(pPeer, 0, pPeersStatus->type + 1, pPeersStatus->tranId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,7 +980,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
|
||||||
|
|
||||||
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
|
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
|
||||||
|
|
||||||
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
char msg[statusMsgLen] = {0};
|
char msg[statusMsgLen] = {0};
|
||||||
|
|
||||||
|
@ -973,6 +995,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
||||||
pPeersStatus->version = nodeVersion;
|
pPeersStatus->version = nodeVersion;
|
||||||
pPeersStatus->role = nodeRole;
|
pPeersStatus->role = nodeRole;
|
||||||
pPeersStatus->ack = ack;
|
pPeersStatus->ack = ack;
|
||||||
|
pPeersStatus->type = type;
|
||||||
|
pPeersStatus->tranId = tranId;
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
|
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
|
||||||
|
@ -981,8 +1005,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
||||||
|
|
||||||
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
||||||
if (retLen == statusMsgLen) {
|
if (retLen == statusMsgLen) {
|
||||||
sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
|
sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d tranId:%u type:%s", pPeer->id, syncRole[pPeersStatus->role],
|
||||||
pPeersStatus->version, pPeersStatus->ack);
|
pPeersStatus->version, pPeersStatus->ack, pPeersStatus->tranId, statusType[pPeersStatus->type]);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
@ -997,7 +1021,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
|
||||||
taosTmrStopA(&pPeer->timer);
|
taosTmrStopA(&pPeer->timer);
|
||||||
if (pPeer->peerFd >= 0) {
|
if (pPeer->peerFd >= 0) {
|
||||||
sDebug("%s, send role version to peer", pPeer->id);
|
sDebug("%s, send role version to peer", pPeer->id);
|
||||||
syncSendPeersStatusMsgToPeer(pPeer, 1);
|
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_SETUP_CONN, syncGenTranId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1110,7 +1134,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
||||||
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
|
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
|
||||||
syncAddPeerRef(pPeer);
|
syncAddPeerRef(pPeer);
|
||||||
sDebug("%s, ready to exchange data", pPeer->id);
|
sDebug("%s, ready to exchange data", pPeer->id);
|
||||||
syncSendPeersStatusMsgToPeer(pPeer, 1);
|
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue