commit
72cef1d81d
|
@ -147,13 +147,13 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
|||
}
|
||||
|
||||
bool dnodeCheckMnodeStarting() {
|
||||
if (tsModuleStatus & TSDB_MOD_MNODE) return false;
|
||||
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) return false;
|
||||
|
||||
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
||||
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
||||
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
|
||||
if (node->nodeId == dnodeGetDnodeId()) {
|
||||
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);;
|
||||
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);
|
||||
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
|
||||
dnodeProcessModuleStatus(moduleStatus);
|
||||
return true;
|
||||
|
|
|
@ -2093,11 +2093,11 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->tableType = pTable->info.type;
|
||||
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
|
||||
if (pTable->superTable) {
|
||||
if (pTable->superTable != NULL) {
|
||||
tstrncpy(pMeta->sTableId, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
|
||||
}
|
||||
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||
if (pTable->info.type == TSDB_CHILD_TABLE && pTable->superTable != NULL) {
|
||||
pMeta->sversion = htons(pTable->superTable->sversion);
|
||||
pMeta->tversion = htons(pTable->superTable->tversion);
|
||||
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
|
||||
|
|
|
@ -126,12 +126,13 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
|||
dstFp = gzdopen(fd, "wb6f");
|
||||
if (dstFp == NULL) {
|
||||
ret = -3;
|
||||
close(fd);
|
||||
goto cmp_end;
|
||||
}
|
||||
|
||||
while (!feof(srcFp)) {
|
||||
len = (int32_t)fread(data, 1, COMPRESS_STEP_SIZE, srcFp);
|
||||
gzwrite(dstFp, data, len);
|
||||
(void)gzwrite(dstFp, data, len);
|
||||
}
|
||||
|
||||
cmp_end:
|
||||
|
|
|
@ -153,10 +153,10 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target
|
|||
for (int32_t i = 0; i < HTTP_MAX_URL; i++) {
|
||||
char *pSeek = strchr(pStart, '/');
|
||||
if (pSeek == NULL) {
|
||||
httpAppendString(pParser->path + i, pStart, strlen(pStart));
|
||||
(void)httpAppendString(pParser->path + i, pStart, strlen(pStart));
|
||||
break;
|
||||
} else {
|
||||
httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart));
|
||||
(void)httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart));
|
||||
}
|
||||
pStart = pSeek + 1;
|
||||
}
|
||||
|
@ -485,11 +485,11 @@ void httpClearParser(HttpParser *parser) {
|
|||
}
|
||||
|
||||
void httpDestroyParser(HttpParser *parser) {
|
||||
if (!parser) return;
|
||||
|
||||
HttpContext *pContext = parser->pContext;
|
||||
httpTrace("context:%p, fd:%d, destroy parser", pContext, pContext->fd);
|
||||
|
||||
if (!parser) return;
|
||||
|
||||
free(parser->method); parser->method = NULL;
|
||||
free(parser->target); parser->target = NULL;
|
||||
free(parser->version); parser->version = NULL;
|
||||
|
@ -684,23 +684,28 @@ static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state,
|
|||
break;
|
||||
}
|
||||
|
||||
if (c!='0' && c!='1') {
|
||||
if (c != '0' && c != '1' && c != '2') {
|
||||
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
|
||||
ok = -1;
|
||||
httpOnError(parser, 400, TSDB_CODE_HTTP_PARSE_VERSION_FAILED);
|
||||
break;
|
||||
}
|
||||
|
||||
if (httpAppendString(&parser->str, &c, 1)) {
|
||||
httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c);
|
||||
ok = -1;
|
||||
httpOnError(parser, 507, TSDB_CODE_HTTP_PARSE_VERSION_FAILED);
|
||||
break;
|
||||
}
|
||||
|
||||
if (c == '0') parser->httpVersion = HTTP_VERSION_10;
|
||||
else if (c == '1') parser->httpVersion = HTTP_VERSION_11;
|
||||
else if (c == '2') parser->httpVersion = HTTP_VERSION_12;
|
||||
else parser->httpVersion = HTTP_INVALID_VERSION;
|
||||
|
||||
if (c == '0')
|
||||
parser->httpVersion = HTTP_VERSION_10;
|
||||
else if (c == '1')
|
||||
parser->httpVersion = HTTP_VERSION_11;
|
||||
else if (c == '2')
|
||||
parser->httpVersion = HTTP_VERSION_12;
|
||||
else {
|
||||
}
|
||||
|
||||
parser->version = strdup(parser->str.str);
|
||||
if (!parser->version) {
|
||||
|
|
|
@ -114,26 +114,26 @@ typedef struct {
|
|||
} SSyncFwds;
|
||||
|
||||
typedef struct SsyncPeer {
|
||||
int32_t nodeId;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
char fqdn[TSDB_FQDN_LEN]; // peer ip string
|
||||
char id[TSDB_EP_LEN+16]; // peer vgId + end point
|
||||
int8_t role;
|
||||
int8_t sstatus; // sync status
|
||||
uint64_t version;
|
||||
uint64_t sversion; // track the peer version in retrieve process
|
||||
int syncFd;
|
||||
int peerFd; // forward FD
|
||||
int numOfRetrieves; // number of retrieves tried
|
||||
int fileChanged; // a flag to indicate file is changed during retrieving process
|
||||
void *timer;
|
||||
void *pConn;
|
||||
int notifyFd;
|
||||
int watchNum;
|
||||
int *watchFd;
|
||||
int8_t refCount; // reference count
|
||||
struct SSyncNode *pSyncNode;
|
||||
int32_t nodeId;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
char fqdn[TSDB_FQDN_LEN]; // peer ip string
|
||||
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
|
||||
int8_t role;
|
||||
int8_t sstatus; // sync status
|
||||
uint64_t version;
|
||||
uint64_t sversion; // track the peer version in retrieve process
|
||||
int syncFd;
|
||||
int peerFd; // forward FD
|
||||
int numOfRetrieves; // number of retrieves tried
|
||||
int fileChanged; // a flag to indicate file is changed during retrieving process
|
||||
void * timer;
|
||||
void * pConn;
|
||||
int notifyFd;
|
||||
int watchNum;
|
||||
int * watchFd;
|
||||
int8_t refCount; // reference count
|
||||
struct SSyncNode *pSyncNode;
|
||||
} SSyncPeer;
|
||||
|
||||
typedef struct SSyncNode {
|
||||
|
@ -171,7 +171,6 @@ void syncBroadcastStatus(SSyncNode *pNode);
|
|||
void syncAddPeerRef(SSyncPeer *pPeer);
|
||||
int syncDecPeerRef(SSyncPeer *pPeer);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -38,7 +38,6 @@ void taosCloseTcpThreadPool(ttpool_h);
|
|||
void *taosAllocateTcpConn(void *, void *ahandle, int connFd);
|
||||
void taosFreeTcpConn(void *);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -134,7 +134,7 @@ void syncCleanUp() {
|
|||
void *syncStart(const SSyncInfo *pInfo) {
|
||||
const SSyncCfg *pCfg = &pInfo->syncCfg;
|
||||
|
||||
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1);
|
||||
SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1);
|
||||
if (pNode == NULL) {
|
||||
sError("no memory to allocate syncNode");
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -168,7 +168,7 @@ void *syncStart(const SSyncInfo *pInfo) {
|
|||
}
|
||||
|
||||
syncAddNodeRef(pNode);
|
||||
|
||||
|
||||
if (pNode->selfIndex < 0) {
|
||||
sInfo("vgId:%d, this node is not configured", pNode->vgId);
|
||||
terrno = TSDB_CODE_SYN_INVALID_CONFIG;
|
||||
|
@ -176,11 +176,12 @@ void *syncStart(const SSyncInfo *pInfo) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
nodeVersion = pInfo->version; // set the initial version
|
||||
nodeVersion = pInfo->version; // set the initial version
|
||||
nodeRole = (pNode->replica > 1) ? TAOS_SYNC_ROLE_UNSYNCED : TAOS_SYNC_ROLE_MASTER;
|
||||
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]);
|
||||
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
|
||||
syncRole[nodeRole]);
|
||||
|
||||
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo*sizeof(SFwdInfo), 1);
|
||||
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1);
|
||||
if (pNode->pSyncFwds == NULL) {
|
||||
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -443,9 +444,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
|
|||
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
|
||||
}
|
||||
|
||||
static void syncAddNodeRef(SSyncNode *pNode) {
|
||||
atomic_add_fetch_8(&pNode->refCount, 1);
|
||||
}
|
||||
static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); }
|
||||
|
||||
static void syncDecNodeRef(SSyncNode *pNode) {
|
||||
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
|
||||
|
@ -456,9 +455,7 @@ static void syncDecNodeRef(SSyncNode *pNode) {
|
|||
}
|
||||
}
|
||||
|
||||
void syncAddPeerRef(SSyncPeer *pPeer) {
|
||||
atomic_add_fetch_8(&pPeer->refCount, 1);
|
||||
}
|
||||
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); }
|
||||
|
||||
int syncDecPeerRef(SSyncPeer *pPeer) {
|
||||
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
|
||||
|
@ -501,6 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
|||
tstrncpy(pPeer->fqdn, pInfo->nodeFqdn, sizeof(pPeer->fqdn));
|
||||
pPeer->ip = ip;
|
||||
pPeer->port = pInfo->nodePort;
|
||||
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
|
||||
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%d", pNode->vgId, pPeer->fqdn, pPeer->port);
|
||||
|
||||
pPeer->peerFd = -1;
|
||||
|
@ -573,10 +571,10 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
|||
replica = pNode->replica + 1;
|
||||
}
|
||||
|
||||
if (index < 0 && onlineNum > replica/2.0) {
|
||||
if (index < 0 && onlineNum > replica / 2.0) {
|
||||
// over half of nodes are online
|
||||
for (int i = 0; i < pNode->replica; ++i) {
|
||||
//slave with highest version shall be master
|
||||
// slave with highest version shall be master
|
||||
pPeer = pNode->peerInfo[i];
|
||||
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
|
||||
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
|
||||
|
@ -622,7 +620,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
|||
if (onlineNum <= replica * 0.5) {
|
||||
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
|
||||
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
|
||||
pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
|
||||
// pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
|
||||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
|
||||
}
|
||||
|
@ -648,7 +646,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
|||
|
||||
static int syncValidateMaster(SSyncPeer *pPeer) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int code = 0;
|
||||
int code = 0;
|
||||
|
||||
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
|
||||
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
|
||||
|
@ -671,7 +669,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
|
|||
int8_t selfOldRole = nodeRole;
|
||||
int8_t i, syncRequired = 0;
|
||||
|
||||
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
||||
// pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
||||
pPeer->role = newRole;
|
||||
|
||||
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
|
||||
|
@ -877,8 +875,6 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
|
||||
|
@ -923,7 +919,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
|||
static int syncProcessPeerMsg(void *param, void *buffer) {
|
||||
SSyncPeer *pPeer = param;
|
||||
SSyncHead head;
|
||||
char * cont = (char *)buffer;
|
||||
char * cont = buffer;
|
||||
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
pthread_mutex_lock(&(pNode->mutex));
|
||||
|
@ -1066,7 +1062,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t vgId = firstPkt.syncHead.vgId;
|
||||
int32_t vgId = firstPkt.syncHead.vgId;
|
||||
SSyncNode **ppNode = (SSyncNode **)taosHashGet(vgIdHash, (const char *)&vgId, sizeof(int32_t));
|
||||
if (ppNode == NULL || *ppNode == NULL) {
|
||||
sError("vgId:%d, vgId could not be found", vgId);
|
||||
|
|
|
@ -23,10 +23,10 @@
|
|||
#include "tsync.h"
|
||||
#include "syncInt.h"
|
||||
|
||||
static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eindex) {
|
||||
char name[TSDB_FILENAME_LEN*2] = {0};
|
||||
char fname[TSDB_FILENAME_LEN*3] = {0};
|
||||
uint32_t magic;
|
||||
static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) {
|
||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||
char fname[TSDB_FILENAME_LEN * 3] = {0};
|
||||
uint32_t magic;
|
||||
uint64_t fversion;
|
||||
int64_t size;
|
||||
uint32_t index = sindex;
|
||||
|
@ -40,12 +40,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
|
|||
if (magic == 0) break;
|
||||
|
||||
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
|
||||
remove(fname);
|
||||
(void)remove(fname);
|
||||
sDebug("%s, %s is removed", pPeer->id, fname);
|
||||
|
||||
index++;
|
||||
if (index > eindex) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||
|
@ -62,35 +62,36 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
while (1) {
|
||||
// read file info
|
||||
int ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
|
||||
if (ret < 0 ) break;
|
||||
if (ret < 0) break;
|
||||
|
||||
// if no more file from master, break;
|
||||
if (minfo.name[0] == 0 || minfo.magic == 0) {
|
||||
sDebug("%s, no more files to restore", pPeer->id);
|
||||
|
||||
// remove extra files after the current index
|
||||
syncRemoveExtraFile(pPeer, sinfo.index+1, TAOS_SYNC_MAX_INDEX);
|
||||
code = 0;
|
||||
syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
|
||||
code = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
// remove extra files on slave between the current and last index
|
||||
syncRemoveExtraFile(pPeer, pindex+1, minfo.index-1);
|
||||
syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
|
||||
pindex = minfo.index;
|
||||
|
||||
// check the file info
|
||||
sinfo = minfo;
|
||||
sDebug("%s, get file info:%s", pPeer->id, minfo.name);
|
||||
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
|
||||
sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
|
||||
&sinfo.fversion);
|
||||
|
||||
// if file not there or magic is not the same, file shall be synced
|
||||
memset(&fileAck, 0, sizeof(fileAck));
|
||||
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1:0;
|
||||
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
|
||||
|
||||
// send file ack
|
||||
ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
|
||||
if (ret <0) break;
|
||||
|
||||
if (ret < 0) break;
|
||||
|
||||
// if sync is not required, continue
|
||||
if (fileAck.sync == 0) {
|
||||
sDebug("%s, %s is the same", pPeer->id, minfo.name);
|
||||
|
@ -99,10 +100,11 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
|
||||
// if sync is required, open file, receive from master, and write to file
|
||||
// get the full path to file
|
||||
minfo.name[sizeof(minfo.name) - 1] = 0;
|
||||
snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);
|
||||
|
||||
int dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if ( dfd < 0 ) {
|
||||
if (dfd < 0) {
|
||||
sError("%s, failed to open file:%s", pPeer->id, name);
|
||||
break;
|
||||
}
|
||||
|
@ -110,16 +112,15 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
|
||||
fsync(dfd);
|
||||
close(dfd);
|
||||
if (ret<0) break;
|
||||
if (ret < 0) break;
|
||||
|
||||
sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
|
||||
|
||||
}
|
||||
|
||||
if (code == 0 && (minfo.fversion != sinfo.fversion)) {
|
||||
// data file is changed, code shall be set to 1
|
||||
// data file is changed, code shall be set to 1
|
||||
*fversion = minfo.fversion;
|
||||
code = 1;
|
||||
code = 1;
|
||||
}
|
||||
|
||||
if (code < 0) {
|
||||
|
@ -130,8 +131,8 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
|||
}
|
||||
|
||||
static int syncRestoreWal(SSyncPeer *pPeer) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int ret, code = -1;
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int ret, code = -1;
|
||||
|
||||
void *buffer = calloc(1024000, 1); // size for one record
|
||||
if (buffer == NULL) return -1;
|
||||
|
@ -140,18 +141,21 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
|
|||
|
||||
while (1) {
|
||||
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
|
||||
if (ret <0) break;
|
||||
if (ret < 0) break;
|
||||
|
||||
if (pHead->len == 0) {
|
||||
code = 0;
|
||||
break;
|
||||
} // wal sync over
|
||||
|
||||
if (pHead->len == 0) {code = 0; break;} // wal sync over
|
||||
|
||||
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
|
||||
if (ret <0) break;
|
||||
if (ret < 0) break;
|
||||
|
||||
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL);
|
||||
}
|
||||
|
||||
if (code<0) {
|
||||
if (code < 0) {
|
||||
sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno));
|
||||
}
|
||||
|
||||
|
@ -159,10 +163,9 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
|
||||
{
|
||||
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SWalHead *pHead = (SWalHead *) offset;
|
||||
SWalHead * pHead = (SWalHead *)offset;
|
||||
|
||||
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
|
||||
offset += pHead->len + sizeof(SWalHead);
|
||||
|
@ -171,7 +174,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
|
|||
}
|
||||
|
||||
static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SSyncNode * pNode = pPeer->pSyncNode;
|
||||
SRecvBuffer *pRecv = pNode->pRecv;
|
||||
int forwards = 0;
|
||||
|
||||
|
@ -182,7 +185,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
|
|||
offset = syncProcessOneBufferedFwd(pPeer, offset);
|
||||
forwards++;
|
||||
}
|
||||
|
||||
|
||||
pthread_mutex_lock(&pNode->mutex);
|
||||
|
||||
while (forwards < pRecv->forwards && pRecv->code == 0) {
|
||||
|
@ -199,7 +202,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
|
|||
}
|
||||
|
||||
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SSyncNode * pNode = pPeer->pSyncNode;
|
||||
SRecvBuffer *pRecv = pNode->pRecv;
|
||||
|
||||
if (pRecv == NULL) return -1;
|
||||
|
@ -259,9 +262,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// if code > 0, data file is changed, notify app, and pass the version
|
||||
// if code > 0, data file is changed, notify app, and pass the version
|
||||
if (code > 0 && pNode->notifyFileSynced) {
|
||||
if ( (*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0 ) {
|
||||
if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) {
|
||||
sError("%s, app not in ready state", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
@ -296,8 +299,8 @@ void *syncRestoreData(void *param) {
|
|||
|
||||
if (syncOpenRecvBuffer(pNode) < 0) {
|
||||
sError("%s, failed to allocate recv buffer", pPeer->id);
|
||||
} else {
|
||||
if ( syncRestoreDataStepByStep(pPeer) == 0) {
|
||||
} else {
|
||||
if (syncRestoreDataStepByStep(pPeer) == 0) {
|
||||
sInfo("%s, it is synced successfully", pPeer->id);
|
||||
nodeRole = TAOS_SYNC_ROLE_SLAVE;
|
||||
syncBroadcastStatus(pNode);
|
||||
|
@ -311,7 +314,7 @@ void *syncRestoreData(void *param) {
|
|||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||
|
||||
nodeSStatus = TAOS_SYNC_STATUS_INIT;
|
||||
taosClose(pPeer->syncFd)
|
||||
taosClose(pPeer->syncFd);
|
||||
syncCloseRecvBuffer(pNode);
|
||||
__sync_fetch_and_sub(&tsSyncNum, 1);
|
||||
syncDecPeerRef(pPeer);
|
||||
|
|
|
@ -38,13 +38,13 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles);
|
||||
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles);
|
||||
if (pPeer->watchFd == NULL) {
|
||||
sError("%s, failed to allocate watchFd", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles);
|
||||
memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles);
|
||||
}
|
||||
|
||||
int *wd = pPeer->watchFd + pPeer->watchNum;
|
||||
|
@ -64,7 +64,7 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
|
|||
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
|
||||
}
|
||||
|
||||
pPeer->watchNum = (pPeer->watchNum +1) % tsMaxWatchFiles;
|
||||
pPeer->watchNum = (pPeer->watchNum + 1) % tsMaxWatchFiles;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -72,20 +72,20 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
|
|||
static int syncAreFilesModified(SSyncPeer *pPeer) {
|
||||
if (pPeer->notifyFd <= 0) return 0;
|
||||
|
||||
char buf[2048];
|
||||
int len = read(pPeer->notifyFd, buf, sizeof(buf));
|
||||
char buf[2048];
|
||||
int len = read(pPeer->notifyFd, buf, sizeof(buf));
|
||||
if (len < 0 && errno != EAGAIN) {
|
||||
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
|
||||
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int code = 0;
|
||||
if (len > 0) {
|
||||
|
||||
int code = 0;
|
||||
if (len > 0) {
|
||||
const struct inotify_event *event;
|
||||
char *ptr;
|
||||
for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
|
||||
event = (const struct inotify_event *) ptr;
|
||||
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
|
||||
event = (const struct inotify_event *)ptr;
|
||||
if ((event->mask & IN_MODIFY) || (event->mask & IN_DELETE)) {
|
||||
sDebug("%s, processed file is changed", pPeer->id);
|
||||
pPeer->fileChanged = 1;
|
||||
code = 1;
|
||||
|
@ -98,11 +98,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) {
|
|||
}
|
||||
|
||||
static int syncRetrieveFile(SSyncPeer *pPeer) {
|
||||
SSyncNode * pNode = pPeer->pSyncNode;
|
||||
SFileInfo fileInfo;
|
||||
SFileAck fileAck;
|
||||
int code = -1;
|
||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SFileInfo fileInfo;
|
||||
SFileAck fileAck;
|
||||
int code = -1;
|
||||
char name[TSDB_FILENAME_LEN * 2] = {0};
|
||||
|
||||
memset(&fileInfo, 0, sizeof(fileInfo));
|
||||
memset(&fileAck, 0, sizeof(fileAck));
|
||||
|
@ -110,17 +110,19 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
while (1) {
|
||||
// retrieve file info
|
||||
fileInfo.name[0] = 0;
|
||||
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion);
|
||||
//fileInfo.size = htonl(size);
|
||||
fileInfo.magic = (*pNode->getFileInfo)(pNode->ahandle, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
|
||||
&fileInfo.size, &fileInfo.fversion);
|
||||
// fileInfo.size = htonl(size);
|
||||
|
||||
// send the file info
|
||||
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
|
||||
if (ret < 0 ) break;
|
||||
if (ret < 0) break;
|
||||
|
||||
// if no file anymore, break
|
||||
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
||||
sDebug("%s, no more files to sync", pPeer->id);
|
||||
code = 0; break;
|
||||
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
|
||||
sDebug("%s, no more files to sync", pPeer->id);
|
||||
code = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
// wait for the ack from peer
|
||||
|
@ -132,29 +134,29 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
|
|||
|
||||
// get the full path to file
|
||||
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
|
||||
|
||||
|
||||
// add the file into watch list
|
||||
if ( syncAddIntoWatchList(pPeer, name) <0) break;
|
||||
if (syncAddIntoWatchList(pPeer, name) < 0) break;
|
||||
|
||||
// if sync is not required, continue
|
||||
if (fileAck.sync == 0) {
|
||||
fileInfo.index++;
|
||||
sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
|
||||
continue;
|
||||
fileInfo.index++;
|
||||
sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// send the file to peer
|
||||
int sfd = open(name, O_RDONLY);
|
||||
if (sfd < 0) break;
|
||||
|
||||
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
||||
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
|
||||
close(sfd);
|
||||
if (ret < 0) break;
|
||||
|
||||
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
|
||||
fileInfo.index++;
|
||||
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
|
||||
fileInfo.index++;
|
||||
|
||||
// check if processed files are modified
|
||||
// check if processed files are modified
|
||||
if (syncAreFilesModified(pPeer) != 0) break;
|
||||
}
|
||||
|
||||
|
@ -201,15 +203,15 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int)*tsMaxWatchFiles);
|
||||
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int) * tsMaxWatchFiles);
|
||||
if (pPeer->watchFd == NULL) {
|
||||
sError("%s, failed to allocate watchFd", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(pPeer->watchFd, -1, sizeof(int)*tsMaxWatchFiles);
|
||||
memset(pPeer->watchFd, -1, sizeof(int) * tsMaxWatchFiles);
|
||||
int *wd = pPeer->watchFd;
|
||||
|
||||
|
||||
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
|
||||
if (*wd == -1) {
|
||||
sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno));
|
||||
|
@ -219,8 +221,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
|
||||
char buf[2048];
|
||||
static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
|
||||
char buf[2048];
|
||||
int len = read(pPeer->notifyFd, buf, sizeof(buf));
|
||||
if (len < 0 && errno != EAGAIN) {
|
||||
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
|
||||
|
@ -231,26 +233,29 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
|
|||
|
||||
struct inotify_event *event;
|
||||
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
|
||||
event = (struct inotify_event *) ptr;
|
||||
event = (struct inotify_event *)ptr;
|
||||
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
|
||||
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
|
||||
}
|
||||
|
||||
if (pEvent != 0)
|
||||
sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
|
||||
if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
|
||||
SWalHead *pHead = (SWalHead *) malloc(640000);
|
||||
int code = -1;
|
||||
int32_t bytes = 0;
|
||||
int sfd;
|
||||
SWalHead *pHead = malloc(640000);
|
||||
int code = -1;
|
||||
int32_t bytes = 0;
|
||||
int sfd;
|
||||
|
||||
sfd = open(name, O_RDONLY);
|
||||
if (sfd < 0) return -1;
|
||||
lseek(sfd, offset, SEEK_SET);
|
||||
if (sfd < 0) {
|
||||
free(pHead);
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)lseek(sfd, offset, SEEK_SET);
|
||||
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
|
||||
|
||||
while (1) {
|
||||
|
@ -263,34 +268,34 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
|
|||
|
||||
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
||||
if ( ret != wsize ) break;
|
||||
if (ret != wsize) break;
|
||||
pPeer->sversion = pHead->version;
|
||||
|
||||
bytes += wsize;
|
||||
|
||||
|
||||
if (pHead->version >= fversion && fversion > 0) {
|
||||
code = 0;
|
||||
bytes = 0;
|
||||
code = 0;
|
||||
bytes = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
free(pHead);
|
||||
taosClose(sfd);
|
||||
close(sfd);
|
||||
|
||||
if (code == 0) return bytes;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int code = -1;
|
||||
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
int code = -1;
|
||||
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
|
||||
|
||||
if (syncAreFilesModified(pPeer) != 0) return -1;
|
||||
|
||||
while (1) {
|
||||
int32_t once = 0; // last WAL has once ever been processed
|
||||
int32_t once = 0; // last WAL has once ever been processed
|
||||
int64_t offset = 0;
|
||||
uint64_t fversion = 0;
|
||||
uint32_t event = 0;
|
||||
|
@ -300,48 +305,48 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
|
|||
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
|
||||
|
||||
// monitor last wal
|
||||
if (syncMonitorLastWal(pPeer, fname) <0) break;
|
||||
if (syncMonitorLastWal(pPeer, fname) < 0) break;
|
||||
|
||||
while (1) {
|
||||
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
|
||||
if (bytes < 0) break;
|
||||
|
||||
// check file changes
|
||||
if (syncCheckLastWalChanges(pPeer, &event) <0) break;
|
||||
if (syncCheckLastWalChanges(pPeer, &event) < 0) break;
|
||||
|
||||
// if file is not updated or updated once, set the fversion and sstatus
|
||||
if (((event & IN_MODIFY) == 0) || once) {
|
||||
if (fversion == 0) {
|
||||
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
|
||||
fversion = nodeVersion; // must read data to fversion
|
||||
fversion = nodeVersion; // must read data to fversion
|
||||
}
|
||||
}
|
||||
|
||||
// if all data up to fversion is read out, it is over
|
||||
if (pPeer->sversion >= fversion && fversion > 0) {
|
||||
code = 0;
|
||||
code = 0;
|
||||
sDebug("%s, data up to fversion:%ld has been read out, bytes:%d", pPeer->id, fversion, bytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if all data are read out, and no update
|
||||
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
|
||||
// wal file is closed, break
|
||||
if (event & IN_CLOSE_WRITE) {
|
||||
code = 0;
|
||||
if (event & IN_CLOSE_WRITE) {
|
||||
code = 0;
|
||||
sDebug("%s, current wal is closed", pPeer->id);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
// wal not closed, it means some data not flushed to disk, wait for a while
|
||||
usleep(10000);
|
||||
}
|
||||
|
||||
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
|
||||
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
|
||||
once = 1;
|
||||
offset += bytes;
|
||||
offset += bytes;
|
||||
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
|
||||
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
|
||||
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
|
||||
}
|
||||
|
||||
if (code < 0) break;
|
||||
|
@ -356,7 +361,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
|
|||
break;
|
||||
}
|
||||
|
||||
// current last wal is closed, there is a new one
|
||||
// current last wal is closed, there is a new one
|
||||
sDebug("%s, last wal is closed, try new one", pPeer->id);
|
||||
}
|
||||
|
||||
|
@ -377,14 +382,14 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
|
|||
while (1) {
|
||||
// retrieve wal info
|
||||
wname[0] = 0;
|
||||
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
|
||||
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
|
||||
if (code < 0) break; // error
|
||||
if (wname[0] == 0) { // no wal file
|
||||
sDebug("%s, no wal file", pPeer->id);
|
||||
break;
|
||||
}
|
||||
|
||||
if (code == 0) { // last wal
|
||||
}
|
||||
|
||||
if (code == 0) { // last wal
|
||||
code = syncProcessLastWal(pPeer, wname, index);
|
||||
break;
|
||||
}
|
||||
|
@ -392,26 +397,26 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
|
|||
// get the full path to wal file
|
||||
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
|
||||
|
||||
// send wal file,
|
||||
// send wal file,
|
||||
// inotify is not required, old wal file won't be modified, even remove is ok
|
||||
if (stat(fname, &fstat) < 0) break;
|
||||
size = fstat.st_size;
|
||||
|
||||
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
|
||||
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
|
||||
int sfd = open(fname, O_RDONLY);
|
||||
if (sfd < 0) break;
|
||||
|
||||
code = taosTSendFile(pPeer->syncFd, sfd, NULL, size);
|
||||
close(sfd);
|
||||
if (code <0) break;
|
||||
code = taosTSendFile(pPeer->syncFd, sfd, NULL, size);
|
||||
close(sfd);
|
||||
if (code < 0) break;
|
||||
|
||||
index++;
|
||||
index++;
|
||||
|
||||
if (syncAreFilesModified(pPeer) != 0) break;
|
||||
if (syncAreFilesModified(pPeer) != 0) break;
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
sDebug("%s, wal retrieve is finished", pPeer->id);
|
||||
sDebug("%s, wal retrieve is finished", pPeer->id);
|
||||
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
|
||||
SWalHead walHead;
|
||||
memset(&walHead, 0, sizeof(walHead));
|
||||
|
@ -433,12 +438,12 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||
firstPkt.port = tsSyncPort;
|
||||
|
||||
if (write(pPeer->syncFd, (char *) &firstPkt, sizeof(firstPkt)) < 0) {
|
||||
if (write(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) {
|
||||
sError("%s, failed to send syncCmd", pPeer->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPeer->sversion = 0;
|
||||
pPeer->sversion = 0;
|
||||
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
|
||||
sDebug("%s, start to retrieve file", pPeer->id);
|
||||
if (syncRetrieveFile(pPeer) < 0) {
|
||||
|
@ -447,8 +452,7 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|||
}
|
||||
|
||||
// if no files are synced, there must be wal to sync, sversion must be larger than one
|
||||
if (pPeer->sversion == 0)
|
||||
pPeer->sversion = 1;
|
||||
if (pPeer->sversion == 0) pPeer->sversion = 1;
|
||||
|
||||
sDebug("%s, start to retrieve wal", pPeer->id);
|
||||
if (syncRetrieveWal(pPeer) < 0) {
|
||||
|
@ -460,8 +464,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
|
|||
}
|
||||
|
||||
void *syncRetrieveData(void *param) {
|
||||
SSyncPeer * pPeer = (SSyncPeer *)param;
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SSyncPeer *pPeer = (SSyncPeer *)param;
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
taosBlockSIGPIPE();
|
||||
|
||||
pPeer->fileChanged = 0;
|
||||
|
@ -470,7 +474,7 @@ void *syncRetrieveData(void *param) {
|
|||
sError("%s, failed to open socket to sync", pPeer->id);
|
||||
} else {
|
||||
sInfo("%s, sync tcp is setup", pPeer->id);
|
||||
|
||||
|
||||
if (syncRetrieveDataStepByStep(pPeer) == 0) {
|
||||
sDebug("%s, sync retrieve process is successful", pPeer->id);
|
||||
} else {
|
||||
|
@ -482,12 +486,11 @@ void *syncRetrieveData(void *param) {
|
|||
if (pPeer->fileChanged) {
|
||||
// if file is changed 3 times continuously, start flow control
|
||||
pPeer->numOfRetrieves++;
|
||||
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
|
||||
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
|
||||
(*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2));
|
||||
} else {
|
||||
pPeer->numOfRetrieves = 0;
|
||||
if (pNode->notifyFlowCtrl)
|
||||
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
|
||||
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
|
||||
}
|
||||
|
||||
pPeer->fileChanged = 0;
|
||||
|
|
|
@ -45,8 +45,8 @@ typedef struct {
|
|||
|
||||
static void *taosAcceptPeerTcpConnection(void *argv);
|
||||
static void *taosProcessTcpData(void *param);
|
||||
static void taosStopPoolThread(SThreadObj *pThread);
|
||||
static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
|
||||
static void taosStopPoolThread(SThreadObj* pThread);
|
||||
|
||||
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
|
||||
pthread_attr_t thattr;
|
||||
|
@ -58,8 +58,8 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
|
|||
}
|
||||
|
||||
pPool->info = *pInfo;
|
||||
|
||||
pPool->pThread = (SThreadObj **) calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
|
||||
|
||||
pPool->pThread = (SThreadObj **)calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
|
||||
if (pPool->pThread == NULL) {
|
||||
uError("TCP server, no enough memory");
|
||||
free(pPool);
|
||||
|
@ -68,17 +68,19 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
|
|||
|
||||
pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
|
||||
if (pPool->acceptFd < 0) {
|
||||
free(pPool->pThread); free(pPool);
|
||||
free(pPool->pThread);
|
||||
free(pPool);
|
||||
uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&(pPool->thread), &thattr, (void *) taosAcceptPeerTcpConnection, pPool) != 0) {
|
||||
if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) {
|
||||
uError("TCP server, failed to create accept thread, reason:%s", strerror(errno));
|
||||
close(pPool->acceptFd);
|
||||
free(pPool->pThread); free(pPool);
|
||||
free(pPool->pThread);
|
||||
free(pPool);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -89,29 +91,30 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
|
|||
}
|
||||
|
||||
void taosCloseTcpThreadPool(void *param) {
|
||||
SPoolObj *pPool = (SPoolObj *)param;
|
||||
SThreadObj *pThread;
|
||||
SPoolObj * pPool = (SPoolObj *)param;
|
||||
SThreadObj *pThread;
|
||||
|
||||
shutdown(pPool->acceptFd, SHUT_RD);
|
||||
shutdown(pPool->acceptFd, SHUT_RD);
|
||||
pthread_join(pPool->thread, NULL);
|
||||
|
||||
for (int i = 0; i < pPool->info.numOfThreads; ++i) {
|
||||
pThread = pPool->pThread[i];
|
||||
if (pThread) taosStopPoolThread(pThread);
|
||||
if (pThread) taosStopPoolThread(pThread);
|
||||
}
|
||||
|
||||
uDebug("%p TCP pool is closed", pPool);
|
||||
|
||||
taosTFree(pPool->pThread);
|
||||
free(pPool);
|
||||
uDebug("%p TCP pool is closed", pPool);
|
||||
}
|
||||
|
||||
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
|
||||
struct epoll_event event;
|
||||
SPoolObj *pPool = (SPoolObj *)param;
|
||||
|
||||
SConnObj *pConn = (SConnObj *) calloc(sizeof(SConnObj), 1);
|
||||
SConnObj *pConn = (SConnObj *)calloc(sizeof(SConnObj), 1);
|
||||
if (pConn == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -131,7 +134,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
|
|||
|
||||
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
||||
uError("failed to add fd:%d(%s)", connFd, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
free(pConn);
|
||||
pConn = NULL;
|
||||
} else {
|
||||
|
@ -143,8 +146,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
|
|||
}
|
||||
|
||||
void taosFreeTcpConn(void *param) {
|
||||
SConnObj * pConn = (SConnObj *)param;
|
||||
SThreadObj *pThread = pConn->pThread;
|
||||
SConnObj * pConn = (SConnObj *)param;
|
||||
SThreadObj *pThread = pConn->pThread;
|
||||
|
||||
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
|
||||
pConn->closedByApp = 1;
|
||||
|
@ -153,9 +156,9 @@ void taosFreeTcpConn(void *param) {
|
|||
|
||||
static void taosProcessBrokenLink(SConnObj *pConn) {
|
||||
SThreadObj *pThread = pConn->pThread;
|
||||
SPoolObj *pPool = pThread->pPool;
|
||||
SPoolInfo *pInfo = &pPool->info;
|
||||
|
||||
SPoolObj * pPool = pThread->pPool;
|
||||
SPoolInfo * pInfo = &pPool->info;
|
||||
|
||||
if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
|
||||
(*pInfo->processBrokenLink)(pConn->ahandle);
|
||||
|
||||
|
@ -169,24 +172,24 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
|
|||
#define maxEvents 10
|
||||
|
||||
static void *taosProcessTcpData(void *param) {
|
||||
SThreadObj *pThread = (SThreadObj *) param;
|
||||
SPoolObj *pPool = pThread->pPool;
|
||||
SPoolInfo *pInfo = &pPool->info;
|
||||
SConnObj *pConn = NULL;
|
||||
SThreadObj *pThread = (SThreadObj *)param;
|
||||
SPoolObj * pPool = pThread->pPool;
|
||||
SPoolInfo * pInfo = &pPool->info;
|
||||
SConnObj * pConn = NULL;
|
||||
struct epoll_event events[maxEvents];
|
||||
|
||||
void *buffer = malloc(pInfo->bufferSize);
|
||||
taosBlockSIGPIPE();
|
||||
|
||||
while (1) {
|
||||
if (pThread->stop) break;
|
||||
if (pThread->stop) break;
|
||||
int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
|
||||
if (pThread->stop) {
|
||||
uDebug("%p TCP epoll thread is exiting...", pThread);
|
||||
break;
|
||||
}
|
||||
|
||||
if (fdNum < 0) {
|
||||
if (fdNum < 0) {
|
||||
uError("epoll_wait failed (%s)", strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
@ -215,27 +218,28 @@ static void *taosProcessTcpData(void *param) {
|
|||
taosFreeTcpConn(pConn);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uDebug("%p TCP epoll thread exits", pThread);
|
||||
|
||||
close(pThread->pollFd);
|
||||
free(pThread);
|
||||
free(buffer);
|
||||
uDebug("%p TCP epoll thread exits", pThread);
|
||||
return NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *taosAcceptPeerTcpConnection(void *argv) {
|
||||
SPoolObj *pPool = (SPoolObj *)argv;
|
||||
SPoolInfo *pInfo = &pPool->info;
|
||||
SPoolObj * pPool = (SPoolObj *)argv;
|
||||
SPoolInfo *pInfo = &pPool->info;
|
||||
|
||||
taosBlockSIGPIPE();
|
||||
|
||||
while (1) {
|
||||
struct sockaddr_in clientAddr;
|
||||
socklen_t addrlen = sizeof(clientAddr);
|
||||
int connFd = accept(pPool->acceptFd, (struct sockaddr *) &clientAddr, &addrlen);
|
||||
int connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
|
||||
if (connFd < 0) {
|
||||
if (errno == EINVAL) {
|
||||
uDebug("%p TCP server accept is exiting...", pPool);
|
||||
|
@ -246,7 +250,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
|
|||
}
|
||||
}
|
||||
|
||||
//uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
|
||||
// uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
|
||||
taosKeepTcpAlive(connFd);
|
||||
(*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
|
||||
}
|
||||
|
@ -260,7 +264,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
|
|||
|
||||
if (pThread) return pThread;
|
||||
|
||||
pThread = (SThreadObj *) calloc(1, sizeof(SThreadObj));
|
||||
pThread = (SThreadObj *)calloc(1, sizeof(SThreadObj));
|
||||
if (pThread == NULL) return NULL;
|
||||
|
||||
pThread->pPool = pPool;
|
||||
|
@ -273,7 +277,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
|
|||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
int ret = pthread_create(&(pThread->thread), &thattr, (void *) taosProcessTcpData, pThread);
|
||||
int ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread);
|
||||
pthread_attr_destroy(&thattr);
|
||||
|
||||
if (ret != 0) {
|
||||
|
@ -290,20 +294,20 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
|
|||
return pThread;
|
||||
}
|
||||
|
||||
static void taosStopPoolThread(SThreadObj* pThread) {
|
||||
static void taosStopPoolThread(SThreadObj *pThread) {
|
||||
pThread->stop = true;
|
||||
|
||||
|
||||
if (pThread->thread == pthread_self()) {
|
||||
pthread_detach(pthread_self());
|
||||
return;
|
||||
}
|
||||
|
||||
// save thread ID into a local variable, since pThread is freed when the thread exits
|
||||
// save thread ID into a local variable, since pThread is freed when the thread exits
|
||||
pthread_t thread = pThread->thread;
|
||||
|
||||
// signal the thread to stop, try graceful method first,
|
||||
// and use pthread_cancel when failed
|
||||
struct epoll_event event = { .events = EPOLLIN };
|
||||
struct epoll_event event = {.events = EPOLLIN};
|
||||
eventfd_t fd = eventfd(1, 0);
|
||||
if (fd == -1) {
|
||||
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption
|
||||
|
@ -319,4 +323,3 @@ static void taosStopPoolThread(SThreadObj* pThread) {
|
|||
pthread_join(thread, NULL);
|
||||
taosClose(fd);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,29 +27,29 @@
|
|||
#include "tsync.h"
|
||||
#include "syncInt.h"
|
||||
|
||||
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
||||
static void arbProcessBrokenLink(void *param);
|
||||
static int arbProcessPeerMsg(void *param, void *buffer);
|
||||
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
||||
static void arbProcessBrokenLink(void *param);
|
||||
static int arbProcessPeerMsg(void *param, void *buffer);
|
||||
static tsem_t tsArbSem;
|
||||
static ttpool_h tsArbTcpPool;
|
||||
|
||||
typedef struct {
|
||||
char id[TSDB_EP_LEN+24];
|
||||
char id[TSDB_EP_LEN + 24];
|
||||
int nodeFd;
|
||||
void *pConn;
|
||||
} SNodeConn;
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
|
||||
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
tsArbitratorPort = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
debugFlag = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g")==0 && i < argc-1) {
|
||||
if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue;
|
||||
} else if (strcmp(argv[i], "-g") == 0 && i < argc - 1) {
|
||||
if (strlen(argv[++i]) > TSDB_FILENAME_LEN) continue;
|
||||
tstrncpy(arbLogPath, argv[i], sizeof(arbLogPath));
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
|
@ -62,8 +62,8 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
|
||||
sDebugFlag = debugFlag;
|
||||
|
||||
if (tsem_init(&tsArbSem, 0, 0) != 0) {
|
||||
|
||||
if (tsem_init(&tsArbSem, 0, 0) != 0) {
|
||||
printf("failed to create exit semphore\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
@ -91,10 +91,10 @@ int main(int argc, char *argv[]) {
|
|||
info.processIncomingMsg = arbProcessPeerMsg;
|
||||
info.processIncomingConn = arbProcessIncommingConnection;
|
||||
tsArbTcpPool = taosOpenTcpThreadPool(&info);
|
||||
|
||||
|
||||
if (tsArbTcpPool == NULL) {
|
||||
sDebug("failed to open TCP thread pool, exit...");
|
||||
return -1;
|
||||
sDebug("failed to open TCP thread pool, exit...");
|
||||
return -1;
|
||||
}
|
||||
|
||||
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort);
|
||||
|
@ -108,9 +108,8 @@ int main(int argc, char *argv[]) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
|
||||
{
|
||||
char ipstr[24];
|
||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
||||
char ipstr[24];
|
||||
tinet_ntoa(ipstr, sourceIp);
|
||||
sDebug("peer TCP connection from ip:%s", ipstr);
|
||||
|
||||
|
@ -121,15 +120,16 @@ static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
|
|||
return;
|
||||
}
|
||||
|
||||
SNodeConn *pNode = (SNodeConn *) calloc(sizeof(SNodeConn), 1);
|
||||
SNodeConn *pNode = (SNodeConn *)calloc(sizeof(SNodeConn), 1);
|
||||
if (pNode == NULL) {
|
||||
sError("failed to allocate memory(%s)", strerror(errno));
|
||||
taosCloseSocket(connFd);
|
||||
return;
|
||||
}
|
||||
|
||||
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
||||
if (firstPkt.syncHead.vgId) {
|
||||
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
|
||||
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
||||
if (firstPkt.syncHead.vgId) {
|
||||
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
|
||||
taosTFree(pNode);
|
||||
taosCloseSocket(connFd);
|
||||
|
@ -151,10 +151,10 @@ static void arbProcessBrokenLink(void *param) {
|
|||
}
|
||||
|
||||
static int arbProcessPeerMsg(void *param, void *buffer) {
|
||||
SNodeConn * pNode = param;
|
||||
SSyncHead head;
|
||||
int bytes = 0;
|
||||
char *cont = (char *)buffer;
|
||||
SNodeConn *pNode = param;
|
||||
SSyncHead head;
|
||||
int bytes = 0;
|
||||
char * cont = (char *)buffer;
|
||||
|
||||
int hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
|
||||
if (hlen != sizeof(head)) {
|
||||
|
|
|
@ -25,31 +25,32 @@ typedef struct {
|
|||
int num;
|
||||
int numOfReqs;
|
||||
int msgSize;
|
||||
tsem_t rspSem;
|
||||
tsem_t *pOverSem;
|
||||
tsem_t rspSem;
|
||||
tsem_t * pOverSem;
|
||||
pthread_t thread;
|
||||
void *pRpc;
|
||||
void * pRpc;
|
||||
} SInfo;
|
||||
|
||||
void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||
uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
||||
uDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
pMsg->code);
|
||||
|
||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
||||
tsem_post(&pInfo->rspSem);
|
||||
tsem_post(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
int tcount = 0;
|
||||
|
||||
void *sendRequest(void *param) {
|
||||
SInfo *pInfo = (SInfo *)param;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
SInfo * pInfo = (SInfo *)param;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
uDebug("thread:%d, start to send request", pInfo->index);
|
||||
|
||||
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||
pInfo->num++;
|
||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||
rpcMsg.contLen = pInfo->msgSize;
|
||||
|
@ -57,8 +58,9 @@ void *sendRequest(void *param) {
|
|||
rpcMsg.msgType = 1;
|
||||
uDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
|
||||
if ( pInfo->num % 20000 == 0 )
|
||||
if (pInfo->num % 20000 == 0) {
|
||||
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
}
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
|
@ -72,12 +74,12 @@ int main(int argc, char *argv[]) {
|
|||
SRpcInit rpcInit;
|
||||
SRpcEpSet epSet;
|
||||
char secret[TSDB_KEY_LEN] = "mypassword";
|
||||
int msgSize = 128;
|
||||
int numOfReqs = 0;
|
||||
int appThreads = 1;
|
||||
char serverIp[40] = "127.0.0.1";
|
||||
struct timeval systemTime;
|
||||
int64_t startTime, endTime;
|
||||
int msgSize = 128;
|
||||
int numOfReqs = 0;
|
||||
int appThreads = 1;
|
||||
char serverIp[40] = "127.0.0.1";
|
||||
struct timeval systemTime;
|
||||
int64_t startTime, endTime;
|
||||
pthread_attr_t thattr;
|
||||
|
||||
// server info
|
||||
|
@ -102,30 +104,30 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.spi = 1;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
epSet.port[0] = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
||||
strcpy(epSet.fqdn[0], argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||
tstrncpy(epSet.fqdn[0], argv[++i], TSDB_FQDN_LEN);
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
|
||||
numOfReqs = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
|
||||
appThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||
numOfReqs = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||
appThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||
rpcInit.user = argv[++i];
|
||||
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||
rpcInit.secret = argv[++i];
|
||||
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||
rpcInit.spi = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
|
@ -157,14 +159,14 @@ int main(int argc, char *argv[]) {
|
|||
uInfo("client is initialized");
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
||||
|
||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i=0; i<appThreads; ++i) {
|
||||
for (int i = 0; i < appThreads; ++i) {
|
||||
pInfo->index = i;
|
||||
pInfo->epSet = epSet;
|
||||
pInfo->numOfReqs = numOfReqs;
|
||||
|
@ -177,18 +179,16 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
do {
|
||||
usleep(1);
|
||||
} while ( tcount < appThreads);
|
||||
} while (tcount < appThreads);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||
float usedTime = (endTime - startTime)/1000.0; // mseconds
|
||||
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
float usedTime = (endTime - startTime) / 1000.0; // mseconds
|
||||
|
||||
uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
|
||||
uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
|
||||
uInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||
uInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -24,28 +24,27 @@
|
|||
#include "twal.h"
|
||||
#include "tsync.h"
|
||||
|
||||
int msgSize = 128;
|
||||
int commit = 0;
|
||||
int dataFd = -1;
|
||||
void *qhandle = NULL;
|
||||
int walNum = 0;
|
||||
int msgSize = 128;
|
||||
int commit = 0;
|
||||
int dataFd = -1;
|
||||
void * qhandle = NULL;
|
||||
int walNum = 0;
|
||||
uint64_t tversion = 0;
|
||||
void *syncHandle;
|
||||
int role;
|
||||
int nodeId;
|
||||
char path[256];
|
||||
int numOfWrites ;
|
||||
void * syncHandle;
|
||||
int role;
|
||||
int nodeId;
|
||||
char path[256];
|
||||
int numOfWrites;
|
||||
SSyncInfo syncInfo;
|
||||
SSyncCfg *pCfg;
|
||||
|
||||
int writeIntoWal(SWalHead *pHead)
|
||||
{
|
||||
int writeIntoWal(SWalHead *pHead) {
|
||||
if (dataFd < 0) {
|
||||
char walName[280];
|
||||
char walName[280];
|
||||
snprintf(walName, sizeof(walName), "%s/wal/wal.%d", path, walNum);
|
||||
remove(walName);
|
||||
dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (dataFd < 0) {
|
||||
(void)remove(walName);
|
||||
dataFd = open(walName, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (dataFd < 0) {
|
||||
uInfo("failed to open wal file:%s(%s)", walName, strerror(errno));
|
||||
return -1;
|
||||
} else {
|
||||
|
@ -67,54 +66,52 @@ int writeIntoWal(SWalHead *pHead)
|
|||
dataFd = -1;
|
||||
numOfWrites = 0;
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void confirmForward(void *ahandle, void *mhandle, int32_t code)
|
||||
{
|
||||
SRpcMsg *pMsg = (SRpcMsg *)mhandle;
|
||||
void confirmForward(void *ahandle, void *mhandle, int32_t code) {
|
||||
SRpcMsg * pMsg = (SRpcMsg *)mhandle;
|
||||
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
|
||||
|
||||
uDebug("ver:%" PRIu64 ", confirm is received", pHead->version);
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = rpcMallocCont(msgSize);
|
||||
rpcMsg.contLen = msgSize;
|
||||
rpcMsg.handle = pMsg->handle;
|
||||
rpcMsg.code = code;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
taosFreeQitem(mhandle);
|
||||
taosFreeQitem(mhandle);
|
||||
}
|
||||
|
||||
int processRpcMsg(void *item) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)item;
|
||||
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
|
||||
int code = -1;
|
||||
SRpcMsg * pMsg = (SRpcMsg *)item;
|
||||
SWalHead *pHead = (SWalHead *)(((char *)pMsg->pCont) - sizeof(SWalHead));
|
||||
int code = -1;
|
||||
|
||||
if (role != TAOS_SYNC_ROLE_MASTER) {
|
||||
uError("not master, write failed, role:%s", syncRole[role]);
|
||||
} else {
|
||||
|
||||
pHead->version = ++tversion;
|
||||
pHead->msgType = pMsg->msgType;
|
||||
pHead->len = pMsg->contLen;
|
||||
|
||||
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version);
|
||||
writeIntoWal(pHead);
|
||||
writeIntoWal(pHead);
|
||||
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
|
||||
|
||||
code = 0;
|
||||
}
|
||||
|
||||
if (pCfg->quorum <= 1) {
|
||||
taosFreeQitem(item);
|
||||
if (pCfg->quorum <= 1) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(item);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = rpcMallocCont(msgSize);
|
||||
rpcMsg.contLen = msgSize;
|
||||
rpcMsg.handle = pMsg->handle;
|
||||
|
@ -126,7 +123,6 @@ int processRpcMsg(void *item) {
|
|||
}
|
||||
|
||||
int processFwdMsg(void *item) {
|
||||
|
||||
SWalHead *pHead = (SWalHead *)item;
|
||||
|
||||
if (pHead->version <= tversion) {
|
||||
|
@ -142,11 +138,11 @@ int processFwdMsg(void *item) {
|
|||
|
||||
// write into cache
|
||||
|
||||
/*
|
||||
if (pHead->handle) {
|
||||
syncSendFwdAck(syncHandle, pHead->handle, 0);
|
||||
}
|
||||
*/
|
||||
/*
|
||||
if (pHead->handle) {
|
||||
syncSendFwdAck(syncHandle, pHead->handle, 0);
|
||||
}
|
||||
*/
|
||||
|
||||
taosFreeQitem(item);
|
||||
|
||||
|
@ -154,7 +150,6 @@ int processFwdMsg(void *item) {
|
|||
}
|
||||
|
||||
int processWalMsg(void *item) {
|
||||
|
||||
SWalHead *pHead = (SWalHead *)item;
|
||||
|
||||
if (pHead->version <= tversion) {
|
||||
|
@ -168,11 +163,11 @@ int processWalMsg(void *item) {
|
|||
|
||||
// write into cache
|
||||
|
||||
/*
|
||||
if (pHead->handle) {
|
||||
syncSendFwdAck(syncHandle, pHead->handle, 0);
|
||||
}
|
||||
*/
|
||||
/*
|
||||
if (pHead->handle) {
|
||||
syncSendFwdAck(syncHandle, pHead->handle, 0);
|
||||
}
|
||||
*/
|
||||
|
||||
taosFreeQitem(item);
|
||||
|
||||
|
@ -180,15 +175,15 @@ int processWalMsg(void *item) {
|
|||
}
|
||||
|
||||
void *processWriteQueue(void *param) {
|
||||
int type;
|
||||
void *item;
|
||||
int type;
|
||||
void *item;
|
||||
|
||||
while (1) {
|
||||
int ret = taosReadQitem(qhandle, &type, &item);
|
||||
if (ret <= 0) {
|
||||
usleep(1000);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
processRpcMsg(item);
|
||||
|
@ -196,8 +191,7 @@ void *processWriteQueue(void *param) {
|
|||
processWalMsg(item);
|
||||
} else if (type == TAOS_QTYPE_FWD) {
|
||||
processFwdMsg(item);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -224,21 +218,19 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
|
|||
}
|
||||
|
||||
void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
|
||||
uDebug("request is received, type:%d, len:%d", pMsg->msgType, pMsg->contLen);
|
||||
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
|
||||
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
|
||||
}
|
||||
|
||||
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion)
|
||||
{
|
||||
uint32_t magic;
|
||||
struct stat fstat;
|
||||
char aname[280];
|
||||
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
|
||||
uint32_t magic;
|
||||
struct stat fstat;
|
||||
char aname[280];
|
||||
|
||||
if (*index == 2) {
|
||||
uInfo("wait for a while .....");
|
||||
|
@ -246,15 +238,15 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
|
|||
}
|
||||
|
||||
if (name[0] == 0) {
|
||||
// find the file
|
||||
// find the file
|
||||
snprintf(aname, sizeof(aname), "%s/data/data.%d", path, *index);
|
||||
sprintf(name, "data/data.%d", *index);
|
||||
sprintf(name, "data/data.%d", *index);
|
||||
} else {
|
||||
snprintf(aname, sizeof(aname), "%s/%s", path, name);
|
||||
}
|
||||
|
||||
uInfo("get file info:%s", aname);
|
||||
if ( stat(aname, &fstat) < 0 ) return 0;
|
||||
if (stat(aname, &fstat) < 0) return 0;
|
||||
|
||||
*size = fstat.st_size;
|
||||
magic = fstat.st_size;
|
||||
|
@ -262,24 +254,22 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
|
|||
return magic;
|
||||
}
|
||||
|
||||
int getWalInfo(void *ahandle, char *name, uint32_t *index) {
|
||||
|
||||
struct stat fstat;
|
||||
char aname[280];
|
||||
int getWalInfo(void *ahandle, char *name, uint32_t *index) {
|
||||
struct stat fstat;
|
||||
char aname[280];
|
||||
|
||||
name[0] = 0;
|
||||
if (*index + 1> walNum) return 0;
|
||||
if (*index + 1 > walNum) return 0;
|
||||
|
||||
snprintf(aname, sizeof(aname), "%s/wal/wal.%d", path, *index);
|
||||
sprintf(name, "wal/wal.%d", *index);
|
||||
sprintf(name, "wal/wal.%d", *index);
|
||||
uInfo("get wal info:%s", aname);
|
||||
|
||||
if ( stat(aname, &fstat) < 0 ) return -1;
|
||||
if (stat(aname, &fstat) < 0) return -1;
|
||||
|
||||
if (*index >= walNum-1) return 0; // no more
|
||||
if (*index >= walNum - 1) return 0; // no more
|
||||
|
||||
return 1;
|
||||
|
||||
}
|
||||
|
||||
int writeToCache(void *ahandle, void *data, int type) {
|
||||
|
@ -290,24 +280,19 @@ int writeToCache(void *ahandle, void *data, int type) {
|
|||
int msgSize = pHead->len + sizeof(SWalHead);
|
||||
void *pMsg = taosAllocateQitem(msgSize);
|
||||
memcpy(pMsg, pHead, msgSize);
|
||||
taosWriteQitem(qhandle, type, pMsg);
|
||||
taosWriteQitem(qhandle, type, pMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void confirmFwd(void *ahandle, int64_t version) {
|
||||
|
||||
return;
|
||||
}
|
||||
void confirmFwd(void *ahandle, int64_t version) { return; }
|
||||
|
||||
void notifyRole(void *ahandle, int8_t r) {
|
||||
role = r;
|
||||
printf("current role:%s\n", syncRole[role]);
|
||||
}
|
||||
|
||||
|
||||
void initSync() {
|
||||
|
||||
pCfg->replica = 1;
|
||||
pCfg->quorum = 1;
|
||||
syncInfo.vgId = 1;
|
||||
|
@ -339,20 +324,18 @@ void initSync() {
|
|||
taosGetFqdn(pCfg->nodeInfo[4].nodeFqdn);
|
||||
}
|
||||
|
||||
void doSync()
|
||||
{
|
||||
for (int i=0; i<5; ++i) {
|
||||
if (tsSyncPort == pCfg->nodeInfo[i].nodePort)
|
||||
nodeId = pCfg->nodeInfo[i].nodeId;
|
||||
void doSync() {
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
if (tsSyncPort == pCfg->nodeInfo[i].nodePort) nodeId = pCfg->nodeInfo[i].nodeId;
|
||||
}
|
||||
|
||||
snprintf(path, sizeof(path), "/root/test/d%d", nodeId);
|
||||
strcpy(syncInfo.path, path);
|
||||
tstrncpy(syncInfo.path, path, sizeof(syncInfo.path));
|
||||
|
||||
if ( syncHandle == NULL) {
|
||||
syncHandle = syncStart(&syncInfo);
|
||||
if (syncHandle == NULL) {
|
||||
syncHandle = syncStart(&syncInfo);
|
||||
} else {
|
||||
if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL;
|
||||
if (syncReconfig(syncHandle, pCfg) < 0) syncHandle = NULL;
|
||||
}
|
||||
|
||||
uInfo("nodeId:%d path:%s syncPort:%d", nodeId, path, tsSyncPort);
|
||||
|
@ -361,39 +344,39 @@ void doSync()
|
|||
int main(int argc, char *argv[]) {
|
||||
SRpcInit rpcInit;
|
||||
char dataName[20] = "server.data";
|
||||
pCfg = &syncInfo.syncCfg;
|
||||
pCfg = &syncInfo.syncCfg;
|
||||
|
||||
initSync();
|
||||
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 7000;
|
||||
rpcInit.label = "SER";
|
||||
rpcInit.localPort = 7000;
|
||||
rpcInit.label = "SER";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.sessions = 1000;
|
||||
rpcInit.idleTime = tsShellActivityTimer*1500;
|
||||
rpcInit.afp = retrieveAuthInfo;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.sessions = 1000;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1500;
|
||||
rpcInit.afp = retrieveAuthInfo;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
rpcInit.localPort = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) {
|
||||
commit = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-v") == 0 && i < argc - 1) {
|
||||
syncInfo.version = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-r") == 0 && i < argc - 1) {
|
||||
pCfg->replica = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-q")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-q") == 0 && i < argc - 1) {
|
||||
pCfg->quorum = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
|
@ -411,10 +394,10 @@ int main(int argc, char *argv[]) {
|
|||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
uDebugFlag = rpcDebugFlag;
|
||||
dDebugFlag = rpcDebugFlag;
|
||||
//tmrDebugFlag = rpcDebugFlag;
|
||||
dDebugFlag = rpcDebugFlag;
|
||||
// tmrDebugFlag = rpcDebugFlag;
|
||||
tsAsyncLog = 0;
|
||||
taosInitLog("server.log", 1000000, 10);
|
||||
|
||||
|
@ -443,35 +426,39 @@ int main(int argc, char *argv[]) {
|
|||
SNodesRole nroles;
|
||||
|
||||
while (1) {
|
||||
char c = getchar();
|
||||
int c = getchar();
|
||||
|
||||
switch(c) {
|
||||
switch (c) {
|
||||
case '1':
|
||||
pCfg->replica = 1; doSync();
|
||||
break;
|
||||
pCfg->replica = 1;
|
||||
doSync();
|
||||
break;
|
||||
case '2':
|
||||
pCfg->replica = 2; doSync();
|
||||
pCfg->replica = 2;
|
||||
doSync();
|
||||
break;
|
||||
case '3':
|
||||
pCfg->replica = 3; doSync();
|
||||
pCfg->replica = 3;
|
||||
doSync();
|
||||
break;
|
||||
case '4':
|
||||
pCfg->replica = 4; doSync();
|
||||
pCfg->replica = 4;
|
||||
doSync();
|
||||
break;
|
||||
case '5':
|
||||
pCfg->replica = 5; doSync();
|
||||
pCfg->replica = 5;
|
||||
doSync();
|
||||
break;
|
||||
case 's':
|
||||
syncGetNodesRole(syncHandle, &nroles);
|
||||
for (int i=0; i<pCfg->replica; ++i)
|
||||
for (int i = 0; i < pCfg->replica; ++i)
|
||||
printf("=== nodeId:%d role:%s\n", nroles.nodeId[i], syncRole[nroles.role[i]]);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (c=='q') break;
|
||||
|
||||
if (c == 'q') break;
|
||||
}
|
||||
|
||||
syncStop(syncHandle);
|
||||
|
@ -483,5 +470,3 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue