|
|
|
@ -26,39 +26,78 @@
|
|
|
|
|
#include "tsync.h"
|
|
|
|
|
#include "syncInt.h"
|
|
|
|
|
|
|
|
|
|
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
uint64_t fver = 0;
|
|
|
|
|
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
|
|
|
|
|
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer);
|
|
|
|
|
pPeer->fileChanged = 1;
|
|
|
|
|
return TSDB_CODE_SYN_VND_COMMITING;
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (fver != pPeer->lastVer) {
|
|
|
|
|
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
|
|
|
|
|
pPeer->lastWalVer = wver;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (wver != pPeer->lastWalVer) {
|
|
|
|
|
sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pPeer->lastFileVer = fver;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
pPeer->fileChanged = 1;
|
|
|
|
|
return TSDB_CODE_SYN_FILE_CHNAGED;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (fver != pPeer->lastFileVer) {
|
|
|
|
|
sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
|
|
|
|
|
pPeer->fileChanged = 1;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pPeer->fileChanged = 0;
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
SSyncNode *pNode = pPeer->pSyncNode;
|
|
|
|
|
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
|
|
|
|
|
SFileAck fileAck = {0};
|
|
|
|
|
int32_t code = TSDB_CODE_SYN_APP_ERROR;
|
|
|
|
|
int32_t code = -1;
|
|
|
|
|
char name[TSDB_FILENAME_LEN * 2] = {0};
|
|
|
|
|
|
|
|
|
|
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
|
|
|
|
|
if (syncGetFileVersion(pNode, pPeer) < 0) return -1;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
// retrieve file info
|
|
|
|
|
fileInfo.name[0] = 0;
|
|
|
|
|
fileInfo.size = 0;
|
|
|
|
|
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
|
|
|
|
&fileInfo.size, &fileInfo.fversion);
|
|
|
|
|
// fileInfo.size = htonl(size);
|
|
|
|
@ -67,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
// send the file info
|
|
|
|
|
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
|
|
|
|
if (ret < 0) {
|
|
|
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if no file anymore, break
|
|
|
|
|
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
|
|
|
|
code = TSDB_CODE_SUCCESS;
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, no more files to sync", pPeer->id);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -82,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
// wait for the ack from peer
|
|
|
|
|
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
|
|
|
|
if (ret < 0) {
|
|
|
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -103,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
// send the file to peer
|
|
|
|
|
int32_t sfd = open(name, O_RDONLY);
|
|
|
|
|
if (sfd < 0) {
|
|
|
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -111,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
|
|
|
|
close(sfd);
|
|
|
|
|
if (ret < 0) {
|
|
|
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -120,128 +159,103 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
|
|
|
|
fileInfo.index++;
|
|
|
|
|
|
|
|
|
|
// check if processed files are modified
|
|
|
|
|
code = syncAreFilesModified(pNode, pPeer);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) break;
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer)) {
|
|
|
|
|
code = -1;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code));
|
|
|
|
|
sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* if only a partial record is read out, set the IN_MODIFY flag in event,
|
|
|
|
|
so upper layer will reload the file to get a complete record */
|
|
|
|
|
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) {
|
|
|
|
|
int32_t ret;
|
|
|
|
|
// if only a partial record is read out, upper layer will reload the file to get a complete record
|
|
|
|
|
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
|
|
|
|
|
int32_t ret = read(sfd, pHead, sizeof(SWalHead));
|
|
|
|
|
if (ret < 0) {
|
|
|
|
|
sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = read(sfd, pHead, sizeof(SWalHead));
|
|
|
|
|
if (ret < 0) return -1;
|
|
|
|
|
if (ret == 0) return 0;
|
|
|
|
|
if (ret == 0) {
|
|
|
|
|
sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ret != sizeof(SWalHead)) {
|
|
|
|
|
// file is not at end yet, it shall be reloaded
|
|
|
|
|
*pEvent = *pEvent | IN_MODIFY;
|
|
|
|
|
sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
|
|
|
|
|
|
|
|
|
|
ret = read(sfd, pHead->cont, pHead->len);
|
|
|
|
|
if (ret < 0) return -1;
|
|
|
|
|
if (ret < 0) {
|
|
|
|
|
sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ret != pHead->len) {
|
|
|
|
|
// file is not at end yet, it shall be reloaded
|
|
|
|
|
*pEvent = *pEvent | IN_MODIFY;
|
|
|
|
|
sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return sizeof(SWalHead) + pHead->len;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
|
|
|
|
|
pPeer->watchNum = 0;
|
|
|
|
|
taosClose(pPeer->notifyFd);
|
|
|
|
|
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
|
|
|
|
|
if (pPeer->notifyFd < 0) {
|
|
|
|
|
sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
|
|
|
|
|
if (pPeer->watchFd == NULL) {
|
|
|
|
|
sError("%s, failed to allocate watchFd", pPeer->id);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
|
|
|
|
|
int32_t *wd = pPeer->watchFd;
|
|
|
|
|
|
|
|
|
|
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
|
|
|
|
|
if (*wd == -1) {
|
|
|
|
|
sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno));
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
|
|
|
|
|
char buf[2048];
|
|
|
|
|
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
|
|
|
|
|
if (len < 0 && errno != EAGAIN) {
|
|
|
|
|
sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (len == 0) return 0;
|
|
|
|
|
|
|
|
|
|
struct inotify_event *event;
|
|
|
|
|
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
|
|
|
|
|
event = (struct inotify_event *)ptr;
|
|
|
|
|
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
|
|
|
|
|
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
|
|
|
|
|
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
|
|
|
|
|
int32_t code = -1;
|
|
|
|
|
int32_t bytes = 0;
|
|
|
|
|
int32_t sfd;
|
|
|
|
|
|
|
|
|
|
sfd = open(name, O_RDONLY);
|
|
|
|
|
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
|
|
|
|
|
int32_t sfd = open(name, O_RDONLY);
|
|
|
|
|
if (sfd < 0) {
|
|
|
|
|
free(pHead);
|
|
|
|
|
sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(void)lseek(sfd, offset, SEEK_SET);
|
|
|
|
|
sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion);
|
|
|
|
|
int32_t code = taosLSeek(sfd, offset, SEEK_SET);
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
|
|
|
|
|
close(sfd);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
|
|
|
|
|
int32_t bytes = 0;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
|
|
|
|
|
if (wsize < 0) break;
|
|
|
|
|
if (wsize == 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
code = syncReadOneWalRecord(sfd, pHead);
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
sError("%s, failed to read one record from wal:%s", pPeer->id, name);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code == 0) {
|
|
|
|
|
code = bytes;
|
|
|
|
|
sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
|
|
|
|
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
|
|
|
|
if (ret != wsize) break;
|
|
|
|
|
pPeer->sversion = pHead->version;
|
|
|
|
|
|
|
|
|
|
int32_t wsize = code;
|
|
|
|
|
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
|
|
|
|
if (ret != wsize) {
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pPeer->sversion = pHead->version;
|
|
|
|
|
bytes += wsize;
|
|
|
|
|
|
|
|
|
|
if (pHead->version >= fversion && fversion > 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
bytes = 0;
|
|
|
|
|
sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -249,92 +263,62 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
|
|
|
|
free(pHead);
|
|
|
|
|
close(sfd);
|
|
|
|
|
|
|
|
|
|
if (code == 0) return bytes;
|
|
|
|
|
return -1;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
|
|
|
|
|
SSyncNode *pNode = pPeer->pSyncNode;
|
|
|
|
|
int32_t code = -1;
|
|
|
|
|
int32_t once = 0; // last WAL has once ever been processed
|
|
|
|
|
int64_t offset = 0;
|
|
|
|
|
uint64_t fversion = 0;
|
|
|
|
|
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
|
|
|
|
|
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer) != 0) return -1;
|
|
|
|
|
// get full path to wal file
|
|
|
|
|
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
|
|
|
|
|
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
int32_t once = 0; // last WAL has once ever been processed
|
|
|
|
|
int64_t offset = 0;
|
|
|
|
|
uint64_t fversion = 0;
|
|
|
|
|
uint32_t event = 0;
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer)) return -1;
|
|
|
|
|
if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
|
|
|
|
|
|
|
|
|
|
// get full path to wal file
|
|
|
|
|
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
|
|
|
|
|
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
|
|
|
|
|
|
|
|
|
|
// monitor last wal
|
|
|
|
|
if (syncMonitorLastWal(pPeer, fname) < 0) break;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
|
|
|
|
|
if (bytes < 0) break;
|
|
|
|
|
|
|
|
|
|
// check file changes
|
|
|
|
|
if (syncCheckLastWalChanges(pPeer, &event) < 0) break;
|
|
|
|
|
|
|
|
|
|
// if file is not updated or updated once, set the fversion and sstatus
|
|
|
|
|
if (((event & IN_MODIFY) == 0) || once) {
|
|
|
|
|
if (fversion == 0) {
|
|
|
|
|
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
|
|
|
|
|
sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
|
|
|
|
fversion = nodeVersion; // must read data to fversion
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if all data up to fversion is read out, it is over
|
|
|
|
|
if (pPeer->sversion >= fversion && fversion > 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if all data are read out, and no update
|
|
|
|
|
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
|
|
|
|
|
// wal file is closed, break
|
|
|
|
|
if (event & IN_CLOSE_WRITE) {
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, current wal is closed", pPeer->id);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// wal not closed, it means some data not flushed to disk, wait for a while
|
|
|
|
|
usleep(10000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
|
|
|
|
|
once = 1;
|
|
|
|
|
offset += bytes;
|
|
|
|
|
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
|
|
|
|
|
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
|
|
|
|
|
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
|
|
|
|
|
if (bytes < 0) {
|
|
|
|
|
sDebug("%s, failed to retrieve last wal", pPeer->id);
|
|
|
|
|
return bytes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code < 0) break;
|
|
|
|
|
if (pPeer->sversion >= fversion && fversion > 0) break;
|
|
|
|
|
// check file changes
|
|
|
|
|
bool walModified = syncIsWalModified(pNode, pPeer);
|
|
|
|
|
|
|
|
|
|
index++;
|
|
|
|
|
wname[0] = 0;
|
|
|
|
|
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
|
|
|
|
|
if (code < 0) break;
|
|
|
|
|
if (wname[0] == 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
break;
|
|
|
|
|
// if file is not updated or updated once, set the fversion and sstatus
|
|
|
|
|
if (!walModified || once) {
|
|
|
|
|
if (fversion == 0) {
|
|
|
|
|
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
|
|
|
|
|
fversion = nodeVersion; // must read data to fversion
|
|
|
|
|
sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// current last wal is closed, there is a new one
|
|
|
|
|
sDebug("%s, last wal is closed, try new one", pPeer->id);
|
|
|
|
|
// if all data up to fversion is read out, it is over
|
|
|
|
|
if (pPeer->sversion >= fversion && fversion > 0) {
|
|
|
|
|
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes,
|
|
|
|
|
pPeer->sversion);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if all data are read out, and no update
|
|
|
|
|
if (bytes == 0 && !walModified) {
|
|
|
|
|
// wal not closed, it means some data not flushed to disk, wait for a while
|
|
|
|
|
usleep(10000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
|
|
|
|
|
once = 1;
|
|
|
|
|
offset += bytes;
|
|
|
|
|
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosClose(pPeer->notifyFd);
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
@ -342,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
char fname[TSDB_FILENAME_LEN * 3];
|
|
|
|
|
char wname[TSDB_FILENAME_LEN * 2];
|
|
|
|
|
int32_t size;
|
|
|
|
|
struct stat fstat;
|
|
|
|
|
int32_t code = -1;
|
|
|
|
|
int64_t index = 0;
|
|
|
|
|
|
|
|
|
@ -350,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
// retrieve wal info
|
|
|
|
|
wname[0] = 0;
|
|
|
|
|
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
|
|
|
|
|
if (code < 0) break; // error
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (wname[0] == 0) { // no wal file
|
|
|
|
|
sDebug("%s, no wal file", pPeer->id);
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, no wal file anymore", pPeer->id);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -364,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
// get the full path to wal file
|
|
|
|
|
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
|
|
|
|
|
|
|
|
|
|
// send wal file,
|
|
|
|
|
// inotify is not required, old wal file won't be modified, even remove is ok
|
|
|
|
|
if (stat(fname, &fstat) < 0) break;
|
|
|
|
|
size = fstat.st_size;
|
|
|
|
|
// send wal file, old wal file won't be modified, even remove is ok
|
|
|
|
|
struct stat fstat;
|
|
|
|
|
if (stat(fname, &fstat) < 0) {
|
|
|
|
|
code = -1;
|
|
|
|
|
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size = fstat.st_size;
|
|
|
|
|
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
|
|
|
|
|
|
|
|
|
|
int32_t sfd = open(fname, O_RDONLY);
|
|
|
|
|
if (sfd < 0) break;
|
|
|
|
|
if (sfd < 0) {
|
|
|
|
|
code = -1;
|
|
|
|
|
sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
|
|
|
|
|
close(sfd);
|
|
|
|
|
if (code < 0) break;
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer) != 0) break;
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer)) {
|
|
|
|
|
code = -1;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code == 0) {
|
|
|
|
@ -386,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
|
|
|
|
|
SWalHead walHead;
|
|
|
|
|
memset(&walHead, 0, sizeof(walHead));
|
|
|
|
|
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
|
|
|
|
|
taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
|
|
|
|
|
} else {
|
|
|
|
|
sError("%s, failed to send wal since %s", pPeer->id, strerror(errno));
|
|
|
|
|
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
@ -428,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|
|
|
|
pPeer->sversion = 0;
|
|
|
|
|
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
|
|
|
|
|
sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
|
|
|
|
|
if (syncRetrieveFile(pPeer) < 0) {
|
|
|
|
|
if (syncRetrieveFile(pPeer) != 0) {
|
|
|
|
|
sError("%s, failed to retrieve files", pPeer->id);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
@ -437,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|
|
|
|
if (pPeer->sversion == 0) pPeer->sversion = 1;
|
|
|
|
|
|
|
|
|
|
sInfo("%s, start to retrieve wals", pPeer->id);
|
|
|
|
|
if (syncRetrieveWal(pPeer) < 0) {
|
|
|
|
|
sError("%s, failed to retrieve wals", pPeer->id);
|
|
|
|
|
int32_t code = syncRetrieveWal(pPeer);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -474,7 +478,6 @@ void *syncRetrieveData(void *param) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pPeer->fileChanged = 0;
|
|
|
|
|
taosClose(pPeer->notifyFd);
|
|
|
|
|
taosClose(pPeer->syncFd);
|
|
|
|
|
syncDecPeerRef(pPeer);
|
|
|
|
|
|
|
|
|
|