This commit is contained in:
Shengliang Guan 2020-12-01 11:52:43 +08:00
parent 2d410638b8
commit bbafebfbee
1 changed files with 47 additions and 25 deletions

View File

@ -97,6 +97,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
while (1) { while (1) {
// retrieve file info // retrieve file info
fileInfo.name[0] = 0; fileInfo.name[0] = 0;
fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size); // fileInfo.size = htonl(size);
@ -105,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0) { if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
} }
// if no file anymore, break // if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
code = TSDB_CODE_SUCCESS; code = 0;
sDebug("%s, no more files to sync", pPeer->id); sDebug("%s, no more files to sync", pPeer->id);
break; break;
} }
@ -120,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
if (ret < 0) { if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = -1;
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
} }
@ -141,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file to peer // send the file to peer
int32_t sfd = open(name, O_RDONLY); int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) { if (sfd < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = -1;
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
} }
@ -149,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret < 0) { if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = -1;
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
} }
@ -165,7 +166,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code)); sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
} }
return code; return code;
@ -180,7 +181,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
} }
if (ret == 0) { if (ret == 0) {
sDebug("sfd:%d, read to end of the wal, ret:%d", sfd, ret); sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret);
return 0; return 0;
} }
@ -254,7 +255,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
if (pHead->version >= fversion && fversion > 0) { if (pHead->version >= fversion && fversion > 0) {
code = 0; code = 0;
sDebug("%s, retrieve wal finished, fver:%" PRIu64, pPeer->id, fversion); sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
break; break;
} }
} }
@ -294,13 +295,14 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
if (fversion == 0) { if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
fversion = nodeVersion; // must read data to fversion fversion = nodeVersion; // must read data to fversion
sDebug("%s, fversion is 0 and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
} }
} }
// if all data up to fversion is read out, it is over // if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) { if (pPeer->sversion >= fversion && fversion > 0) {
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes,
pPeer->sversion);
return 0; return 0;
} }
@ -324,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
struct stat fstat;
int32_t code = -1; int32_t code = -1;
int64_t index = 0; int64_t index = 0;
@ -332,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// retrieve wal info // retrieve wal info
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break; // error if (code < 0) {
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
break;
}
if (wname[0] == 0) { // no wal file if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id); code = 0;
sDebug("%s, no wal file anymore", pPeer->id);
break; break;
} }
@ -346,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// get the full path to wal file // get the full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
// send wal file, // send wal file, old wal file won't be modified, even remove is ok
// inotify is not required, old wal file won't be modified, even remove is ok struct stat fstat;
if (stat(fname, &fstat) < 0) break; if (stat(fname, &fstat) < 0) {
size = fstat.st_size; code = -1;
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
int32_t sfd = open(fname, O_RDONLY); int32_t sfd = open(fname, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) {
code = -1;
sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
code = taosSendFile(pPeer->syncFd, sfd, NULL, size); code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code < 0) break; if (code < 0) {
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
if (syncAreFilesModified(pNode, pPeer)) break; if (syncAreFilesModified(pNode, pPeer)) {
code = -1;
break;
}
} }
if (code == 0) { if (code == 0) {
@ -368,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
SWalHead walHead; SWalHead walHead;
memset(&walHead, 0, sizeof(walHead)); memset(&walHead, 0, sizeof(walHead));
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
} else { } else {
sError("%s, failed to send wal since %s", pPeer->id, strerror(errno)); sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
} }
return code; return code;
@ -410,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer->sversion = 0; pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE; pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
if (syncRetrieveFile(pPeer) < 0) { if (syncRetrieveFile(pPeer) != 0) {
sError("%s, failed to retrieve files", pPeer->id); sError("%s, failed to retrieve files", pPeer->id);
return -1; return -1;
} }
@ -419,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
if (pPeer->sversion == 0) pPeer->sversion = 1; if (pPeer->sversion == 0) pPeer->sversion = 1;
sInfo("%s, start to retrieve wals", pPeer->id); sInfo("%s, start to retrieve wals", pPeer->id);
if (syncRetrieveWal(pPeer) < 0) { int32_t code = syncRetrieveWal(pPeer);
sError("%s, failed to retrieve wals", pPeer->id); if (code != 0) {
sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code);
return -1; return -1;
} }