TD-1455 TD-2196
This commit is contained in:
parent
feb2270a4c
commit
6bf39dfa3b
|
@ -261,6 +261,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||||
|
@ -367,6 +370,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, 0, 0x11A4, "tag value
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find")
|
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string")
|
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string")
|
||||||
|
|
||||||
|
// odbc
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2100, "out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2100, "out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, 0, 0x2101, "convertion not a valid literal input")
|
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, 0, 0x2101, "convertion not a valid literal input")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined")
|
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined")
|
||||||
|
@ -390,7 +394,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_BAD_SEQ, 0, 0x2113, "src bad se
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_INCOMPLETE, 0, 0x2114, "src incomplete")
|
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_INCOMPLETE, 0, 0x2114, "src incomplete")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_GENERAL, 0, 0x2115, "src general")
|
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_GENERAL, 0, 0x2115, "src general")
|
||||||
|
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include <sys/inotify.h>
|
#include <sys/inotify.h>
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -26,28 +27,31 @@
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
|
||||||
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
|
||||||
if (pNode->getFileVersion == NULL) return 0;
|
if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
uint64_t fver = 0;
|
uint64_t fver = 0;
|
||||||
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
|
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sInfo("%s, file are modified while retrieve, lastver:%" PRIu64, pPeer->id, pPeer->lastVer);
|
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer);
|
||||||
return code;
|
pPeer->fileChanged = 1;
|
||||||
|
return TSDB_CODE_SYN_VND_COMMITING;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fver != pPeer->lastVer) {
|
if (fver != pPeer->lastVer) {
|
||||||
sInfo("%s, file are modified while retrieve, fver:%" PRIu64 " lastver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
|
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
|
||||||
return -1;
|
pPeer->fileChanged = 1;
|
||||||
|
return TSDB_CODE_SYN_FILE_CHNAGED;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
pPeer->fileChanged = 0;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
SFileInfo fileInfo = {0};
|
SFileInfo fileInfo = {0};
|
||||||
SFileAck fileAck = {0};
|
SFileAck fileAck = {0};
|
||||||
int32_t code = -1;
|
int32_t code = TSDB_CODE_SYN_APP_ERROR;
|
||||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
|
|
||||||
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
|
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
|
||||||
|
@ -58,25 +62,27 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
||||||
&fileInfo.size, &fileInfo.fversion);
|
&fileInfo.size, &fileInfo.fversion);
|
||||||
// fileInfo.size = htonl(size);
|
// fileInfo.size = htonl(size);
|
||||||
sDebug("%s, file:%s info will be sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
|
||||||
|
|
||||||
// send the file info
|
// send the file info
|
||||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no file anymore, break
|
// if no file anymore, break
|
||||||
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
sDebug("%s, no more files to sync", pPeer->id);
|
sDebug("%s, no more files to sync", pPeer->id);
|
||||||
code = 0;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the ack from peer
|
// wait for the ack from peer
|
||||||
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -97,6 +103,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
// send the file to peer
|
// send the file to peer
|
||||||
int32_t sfd = open(name, O_RDONLY);
|
int32_t sfd = open(name, O_RDONLY);
|
||||||
if (sfd < 0) {
|
if (sfd < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -104,6 +111,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
||||||
close(sfd);
|
close(sfd);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -112,11 +120,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
|
||||||
fileInfo.index++;
|
fileInfo.index++;
|
||||||
|
|
||||||
// check if processed files are modified
|
// check if processed files are modified
|
||||||
if (syncAreFilesModified(pNode, pPeer) != 0) break;
|
code = syncAreFilesModified(pNode, pPeer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code < 0) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
sError("%s, failed to retrieve file", pPeer->id);
|
sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -368,8 +377,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
|
||||||
close(sfd);
|
close(sfd);
|
||||||
if (code < 0) break;
|
if (code < 0) break;
|
||||||
|
|
||||||
index++;
|
|
||||||
|
|
||||||
if (syncAreFilesModified(pNode, pPeer) != 0) break;
|
if (syncAreFilesModified(pNode, pPeer) != 0) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,7 +452,6 @@ void *syncRetrieveData(void *param) {
|
||||||
|
|
||||||
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
||||||
|
|
||||||
pPeer->fileChanged = 0;
|
|
||||||
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
||||||
if (pPeer->syncFd < 0) {
|
if (pPeer->syncFd < 0) {
|
||||||
sError("%s, failed to open socket to sync", pPeer->id);
|
sError("%s, failed to open socket to sync", pPeer->id);
|
||||||
|
|
Loading…
Reference in New Issue