|
|
|
@ -29,7 +29,7 @@ static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
sInfo("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -41,12 +41,12 @@ static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
sInfo("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (wver != pPeer->lastWalVer) {
|
|
|
|
|
sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
|
|
|
|
|
sInfo("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -57,7 +57,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
sInfo("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -69,13 +69,13 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
|
|
|
|
uint64_t fver, wver;
|
|
|
|
|
int32_t code = (*pNode->getVersionFp)(pNode->vgId, &fver, &wver);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
|
|
|
|
|
pPeer->fileChanged = 1;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (fver != pPeer->lastFileVer) {
|
|
|
|
|
sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
|
|
|
|
|
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
|
|
|
|
|
pPeer->fileChanged = 1;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -143,13 +143,13 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ret == 0) {
|
|
|
|
|
sDebug("sfd:%d, read to the end of file, ret:%d", sfd, ret);
|
|
|
|
|
sInfo("sfd:%d, read to the end of file, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ret != sizeof(SWalHead)) {
|
|
|
|
|
// file is not at end yet, it shall be reloaded
|
|
|
|
|
sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
|
|
|
|
|
sInfo("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -163,7 +163,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
|
|
|
|
|
|
|
|
|
|
if (ret != pHead->len) {
|
|
|
|
|
// file is not at end yet, it shall be reloaded
|
|
|
|
|
sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
|
|
|
|
|
sInfo("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -184,7 +184,7 @@ static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
|
|
|
|
|
sInfo("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
|
|
|
|
|
|
|
|
|
|
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
|
|
|
|
|
int64_t bytes = 0;
|
|
|
|
@ -198,7 +198,7 @@ static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
|
|
|
|
|
|
|
|
|
if (code == 0) {
|
|
|
|
|
code = bytes;
|
|
|
|
|
sDebug("%s, read to the end of wal, bytes:%" PRId64, pPeer->id, bytes);
|
|
|
|
|
sInfo("%s, read to the end of wal, bytes:%" PRId64, pPeer->id, bytes);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -217,7 +217,7 @@ static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
|
|
|
|
|
|
|
|
|
if (pHead->version >= fversion && fversion > 0) {
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
|
|
|
|
|
sInfo("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -237,7 +237,7 @@ static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
|
|
|
|
|
|
|
|
|
|
// get full path to wal file
|
|
|
|
|
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
|
|
|
|
|
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
|
|
|
|
|
sInfo("%s, start to retrieve last wal:%s", pPeer->id, fname);
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
if (syncAreFilesModified(pNode, pPeer)) return -1;
|
|
|
|
@ -245,7 +245,7 @@ static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
|
|
|
|
|
|
|
|
|
|
int64_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
|
|
|
|
|
if (bytes < 0) {
|
|
|
|
|
sDebug("%s, failed to retrieve last wal, bytes:%" PRId64, pPeer->id, bytes);
|
|
|
|
|
sInfo("%s, failed to retrieve last wal, bytes:%" PRId64, pPeer->id, bytes);
|
|
|
|
|
return bytes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -257,13 +257,13 @@ static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
|
|
|
|
|
if (fversion == 0) {
|
|
|
|
|
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
|
|
|
|
|
fversion = nodeVersion; // must read data to fversion
|
|
|
|
|
sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
|
|
|
|
|
sInfo("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if all data up to fversion is read out, it is over
|
|
|
|
|
if (pPeer->sversion >= fversion && fversion > 0) {
|
|
|
|
|
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%" PRId64 " sver:%" PRIu64, pPeer->id, fversion, bytes,
|
|
|
|
|
sInfo("%s, data up to fver:%" PRIu64 " has been read out, bytes:%" PRId64 " sver:%" PRIu64, pPeer->id, fversion, bytes,
|
|
|
|
|
pPeer->sversion);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
@ -277,7 +277,7 @@ static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
|
|
|
|
|
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
|
|
|
|
|
once = 1;
|
|
|
|
|
offset += bytes;
|
|
|
|
|
sDebug("%s, continue retrieve last wal, bytes:%" PRId64 " offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
|
|
|
|
|
sInfo("%s, continue retrieve last wal, bytes:%" PRId64 " offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
|
|
|
|
|
bytes, offset, pPeer->sversion, fversion);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -303,7 +303,7 @@ static int64_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
|
|
|
|
|
if (wname[0] == 0) { // no wal file
|
|
|
|
|
code = 0;
|
|
|
|
|
sDebug("%s, no wal file anymore", pPeer->id);
|
|
|
|
|
sInfo("%s, no wal file anymore", pPeer->id);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -320,12 +320,12 @@ static int64_t syncRetrieveWal(SSyncPeer *pPeer) {
|
|
|
|
|
struct stat fstat;
|
|
|
|
|
if (stat(fname, &fstat) < 0) {
|
|
|
|
|
code = -1;
|
|
|
|
|
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
|
|
|
|
|
sInfo("%s, failed to stat wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size = fstat.st_size;
|
|
|
|
|
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
|
|
|
|
|
sInfo("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
|
|
|
|
|
|
|
|
|
|
int32_t sfd = open(fname, O_RDONLY | O_BINARY);
|
|
|
|
|
if (sfd < 0) {
|
|
|
|
@ -374,7 +374,7 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
|
|
|
|
|
sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
|
|
|
|
|
sInfo("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
|
|
|
|
|
|
|
|
|
|
SSyncRsp rsp;
|
|
|
|
|
if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
|
|
|
|
@ -382,7 +382,7 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
|
|
|
|
|
sInfo("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|