commit
1d7f65e1ae
|
@ -114,26 +114,26 @@ typedef struct {
|
||||||
} SSyncFwds;
|
} SSyncFwds;
|
||||||
|
|
||||||
typedef struct SsyncPeer {
|
typedef struct SsyncPeer {
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
char fqdn[TSDB_FQDN_LEN]; // peer ip string
|
char fqdn[TSDB_FQDN_LEN]; // peer ip string
|
||||||
char id[TSDB_EP_LEN+16]; // peer vgId + end point
|
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int8_t sstatus; // sync status
|
int8_t sstatus; // sync status
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint64_t sversion; // track the peer version in retrieve process
|
uint64_t sversion; // track the peer version in retrieve process
|
||||||
int syncFd;
|
int syncFd;
|
||||||
int peerFd; // forward FD
|
int peerFd; // forward FD
|
||||||
int numOfRetrieves; // number of retrieves tried
|
int numOfRetrieves; // number of retrieves tried
|
||||||
int fileChanged; // a flag to indicate file is changed during retrieving process
|
int fileChanged; // a flag to indicate file is changed during retrieving process
|
||||||
void *timer;
|
void * timer;
|
||||||
void *pConn;
|
void * pConn;
|
||||||
int notifyFd;
|
int notifyFd;
|
||||||
int watchNum;
|
int watchNum;
|
||||||
int *watchFd;
|
int * watchFd;
|
||||||
int8_t refCount; // reference count
|
int8_t refCount; // reference count
|
||||||
struct SSyncNode *pSyncNode;
|
struct SSyncNode *pSyncNode;
|
||||||
} SSyncPeer;
|
} SSyncPeer;
|
||||||
|
|
||||||
typedef struct SSyncNode {
|
typedef struct SSyncNode {
|
||||||
|
|
|
@ -622,7 +622,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
||||||
if (onlineNum <= replica * 0.5) {
|
if (onlineNum <= replica * 0.5) {
|
||||||
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
|
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
|
||||||
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
|
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
|
||||||
pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
|
//pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
|
||||||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||||
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
|
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
|
||||||
}
|
}
|
||||||
|
@ -671,7 +671,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
|
||||||
int8_t selfOldRole = nodeRole;
|
int8_t selfOldRole = nodeRole;
|
||||||
int8_t i, syncRequired = 0;
|
int8_t i, syncRequired = 0;
|
||||||
|
|
||||||
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
//pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
||||||
pPeer->role = newRole;
|
pPeer->role = newRole;
|
||||||
|
|
||||||
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
|
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
|
||||||
|
@ -923,7 +923,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
||||||
static int syncProcessPeerMsg(void *param, void *buffer) {
|
static int syncProcessPeerMsg(void *param, void *buffer) {
|
||||||
SSyncPeer *pPeer = param;
|
SSyncPeer *pPeer = param;
|
||||||
SSyncHead head;
|
SSyncHead head;
|
||||||
char * cont = (char *)buffer;
|
char * cont = buffer;
|
||||||
|
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
|
|
|
@ -27,29 +27,29 @@
|
||||||
#include "tsync.h"
|
#include "tsync.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
|
||||||
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
|
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
||||||
static void arbProcessBrokenLink(void *param);
|
static void arbProcessBrokenLink(void *param);
|
||||||
static int arbProcessPeerMsg(void *param, void *buffer);
|
static int arbProcessPeerMsg(void *param, void *buffer);
|
||||||
static tsem_t tsArbSem;
|
static tsem_t tsArbSem;
|
||||||
static ttpool_h tsArbTcpPool;
|
static ttpool_h tsArbTcpPool;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char id[TSDB_EP_LEN+24];
|
char id[TSDB_EP_LEN + 24];
|
||||||
int nodeFd;
|
int nodeFd;
|
||||||
void *pConn;
|
void *pConn;
|
||||||
} SNodeConn;
|
} SNodeConn;
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
|
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
|
||||||
|
|
||||||
for (int i=1; i<argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
tsArbitratorPort = atoi(argv[++i]);
|
tsArbitratorPort = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
debugFlag = atoi(argv[++i]);
|
debugFlag = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-g")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-g") == 0 && i < argc - 1) {
|
||||||
if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue;
|
if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue;
|
||||||
tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath));
|
tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath));
|
||||||
} else {
|
} else {
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
@ -62,8 +62,8 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebugFlag = debugFlag;
|
sDebugFlag = debugFlag;
|
||||||
|
|
||||||
if (tsem_init(&tsArbSem, 0, 0) != 0) {
|
if (tsem_init(&tsArbSem, 0, 0) != 0) {
|
||||||
printf("failed to create exit semphore\n");
|
printf("failed to create exit semphore\n");
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
@ -91,10 +91,10 @@ int main(int argc, char *argv[]) {
|
||||||
info.processIncomingMsg = arbProcessPeerMsg;
|
info.processIncomingMsg = arbProcessPeerMsg;
|
||||||
info.processIncomingConn = arbProcessIncommingConnection;
|
info.processIncomingConn = arbProcessIncommingConnection;
|
||||||
tsArbTcpPool = taosOpenTcpThreadPool(&info);
|
tsArbTcpPool = taosOpenTcpThreadPool(&info);
|
||||||
|
|
||||||
if (tsArbTcpPool == NULL) {
|
if (tsArbTcpPool == NULL) {
|
||||||
sDebug("failed to open TCP thread pool, exit...");
|
sDebug("failed to open TCP thread pool, exit...");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort);
|
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort);
|
||||||
|
@ -108,9 +108,8 @@ int main(int argc, char *argv[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
|
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
||||||
{
|
char ipstr[24];
|
||||||
char ipstr[24];
|
|
||||||
tinet_ntoa(ipstr, sourceIp);
|
tinet_ntoa(ipstr, sourceIp);
|
||||||
sDebug("peer TCP connection from ip:%s", ipstr);
|
sDebug("peer TCP connection from ip:%s", ipstr);
|
||||||
|
|
||||||
|
@ -121,15 +120,16 @@ static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeConn *pNode = (SNodeConn *) calloc(sizeof(SNodeConn), 1);
|
SNodeConn *pNode = (SNodeConn *)calloc(sizeof(SNodeConn), 1);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
sError("failed to allocate memory(%s)", strerror(errno));
|
sError("failed to allocate memory(%s)", strerror(errno));
|
||||||
taosCloseSocket(connFd);
|
taosCloseSocket(connFd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
|
||||||
if (firstPkt.syncHead.vgId) {
|
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
||||||
|
if (firstPkt.syncHead.vgId) {
|
||||||
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
|
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
|
||||||
taosTFree(pNode);
|
taosTFree(pNode);
|
||||||
taosCloseSocket(connFd);
|
taosCloseSocket(connFd);
|
||||||
|
@ -151,10 +151,10 @@ static void arbProcessBrokenLink(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int arbProcessPeerMsg(void *param, void *buffer) {
|
static int arbProcessPeerMsg(void *param, void *buffer) {
|
||||||
SNodeConn * pNode = param;
|
SNodeConn *pNode = param;
|
||||||
SSyncHead head;
|
SSyncHead head;
|
||||||
int bytes = 0;
|
int bytes = 0;
|
||||||
char *cont = (char *)buffer;
|
char * cont = (char *)buffer;
|
||||||
|
|
||||||
int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
|
int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
|
||||||
if (hlen != sizeof(head)) {
|
if (hlen != sizeof(head)) {
|
||||||
|
|
Loading…
Reference in New Issue