TD-2086
This commit is contained in:
parent
11a5fe2719
commit
99815378bf
|
@ -41,6 +41,7 @@ typedef enum {
|
|||
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
|
||||
#define SYNC_FWD_TIMER 300
|
||||
#define SYNC_ROLE_TIMER 10000
|
||||
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
|
||||
|
||||
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
|
||||
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
|
||||
|
|
|
@ -626,17 +626,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
|||
sInfo("vgId:%d, start to work as master", pNode->vgId);
|
||||
nodeRole = TAOS_SYNC_ROLE_MASTER;
|
||||
|
||||
#if 0
|
||||
for (int32_t i = 0; i < pNode->replica; ++i) {
|
||||
if (i == index) continue;
|
||||
pPeer = pNode->peerInfo[i];
|
||||
if (pPeer->version == nodeVersion) {
|
||||
pPeer->role = TAOS_SYNC_ROLE_SLAVE;
|
||||
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
|
||||
sInfo("%s, it shall work as slave", pPeer->id);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
// Wait for other nodes to receive status to avoid version inconsistency
|
||||
taosMsleep(SYNC_WAIT_AFTER_CHOOSE_MASTER);
|
||||
|
||||
syncResetFlowCtrl(pNode);
|
||||
(*pNode->notifyRole)(pNode->vgId, nodeRole);
|
||||
} else {
|
||||
|
@ -761,7 +753,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
|
|||
sDebug("vgId:%d, choose master", pNode->vgId);
|
||||
syncChooseMaster(pNode);
|
||||
} else {
|
||||
sDebug("vgId:%d, cannot choose master since roles inconsistent", pNode->vgId);
|
||||
sDebug("vgId:%d, cannot choose master since roles inequality", pNode->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1124,7 +1116,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
|||
}
|
||||
|
||||
int32_t vgId = firstPkt.syncHead.vgId;
|
||||
SSyncNode **ppNode = (SSyncNode **)taosHashGet(tsVgIdHash, (const char *)&vgId, sizeof(int32_t));
|
||||
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
|
||||
if (ppNode == NULL || *ppNode == NULL) {
|
||||
sError("vgId:%d, vgId could not be found", vgId);
|
||||
taosCloseSocket(connFd);
|
||||
|
|
|
@ -147,12 +147,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int32_t ret, code = -1;
|
||||
uint64_t lastVer = 0;
|
||||
|
||||
void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record
|
||||
if (buffer == NULL) return -1;
|
||||
|
||||
SWalHead *pHead = (SWalHead *)buffer;
|
||||
uint64_t lastVer = 0;
|
||||
SWalHead *pHead = calloc(SYNC_MAX_SIZE, 1); // size for one record
|
||||
if (pHead == NULL) return -1;
|
||||
|
||||
while (1) {
|
||||
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
|
||||
|
@ -188,7 +186,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
|||
sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno));
|
||||
}
|
||||
|
||||
free(buffer);
|
||||
free(pHead);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue