commit
37fe814788
|
@ -32,7 +32,7 @@
|
||||||
// global configurable
|
// global configurable
|
||||||
int tsMaxSyncNum = 2;
|
int tsMaxSyncNum = 2;
|
||||||
int tsSyncTcpThreads = 2;
|
int tsSyncTcpThreads = 2;
|
||||||
int tsMaxWatchFiles = 100;
|
int tsMaxWatchFiles = 500;
|
||||||
int tsMaxFwdInfo = 200;
|
int tsMaxFwdInfo = 200;
|
||||||
int tsSyncTimer = 1;
|
int tsSyncTimer = 1;
|
||||||
//int sDebugFlag = 135;
|
//int sDebugFlag = 135;
|
||||||
|
@ -516,7 +516,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
|
||||||
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
||||||
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
||||||
sDebug("%s, start to check peer connection", pPeer->id);
|
sDebug("%s, start to check peer connection", pPeer->id);
|
||||||
taosTmrReset(syncCheckPeerConnection, 100, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAddNodeRef(pNode);
|
syncAddNodeRef(pNode);
|
||||||
|
@ -815,7 +815,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
|
||||||
taosTmrStopA(&pPeer->timer);
|
taosTmrStopA(&pPeer->timer);
|
||||||
if (tsSyncNum >= tsMaxSyncNum) {
|
if (tsSyncNum >= tsMaxSyncNum) {
|
||||||
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
|
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
|
||||||
taosTmrReset(syncTryRecoverFromMaster, 500, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId*10)%200, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,13 +57,14 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY);
|
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE);
|
||||||
if (*wd == -1) {
|
if (*wd == -1) {
|
||||||
sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno));
|
sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
pPeer->watchNum++;
|
|
||||||
pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles;
|
pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -75,16 +76,24 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
|
||||||
|
|
||||||
char buf[2048];
|
char buf[2048];
|
||||||
int len = read(pPeer->notifyFd, buf, sizeof(buf));
|
int len = read(pPeer->notifyFd, buf, sizeof(buf));
|
||||||
if (len <0 && errno != EAGAIN) {
|
if (len < 0 && errno != EAGAIN) {
|
||||||
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
|
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = 0;
|
int code = 0;
|
||||||
if (len >0) {
|
if (len > 0) {
|
||||||
sDebug("%s, processed file is changed", pPeer->id);
|
const struct inotify_event *event;
|
||||||
pPeer->fileChanged = 1;
|
char *ptr;
|
||||||
code = 1;
|
for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
|
||||||
|
event = (const struct inotify_event *) ptr;
|
||||||
|
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
|
||||||
|
sDebug("%s, processed file is changed", pPeer->id);
|
||||||
|
pPeer->fileChanged = 1;
|
||||||
|
code = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue