Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
c30d9b605b
|
@ -16,11 +16,13 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "tmqtt.h"
|
#include "tmqtt.h"
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
|
#include "dnode.h"
|
||||||
#include "dnodeInt.h"
|
#include "dnodeInt.h"
|
||||||
#include "dnodeModule.h"
|
#include "dnodeModule.h"
|
||||||
|
|
||||||
|
@ -129,17 +131,32 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
||||||
for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) {
|
for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) {
|
||||||
bool enableModule = moduleStatus & (1 << module);
|
bool enableModule = moduleStatus & (1 << module);
|
||||||
if (!tsModule[module].enable && enableModule) {
|
if (!tsModule[module].enable && enableModule) {
|
||||||
dInfo("module status:%u is received, start %s module", tsModuleStatus, tsModule[module].name);
|
dInfo("module status:%u is set, start %s module", moduleStatus, tsModule[module].name);
|
||||||
tsModule[module].enable = true;
|
tsModule[module].enable = true;
|
||||||
dnodeSetModuleStatus(module);
|
dnodeSetModuleStatus(module);
|
||||||
(*tsModule[module].startFp)();
|
(*tsModule[module].startFp)();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsModule[module].enable && !enableModule) {
|
if (tsModule[module].enable && !enableModule) {
|
||||||
dInfo("module status:%u is received, stop %s module", tsModuleStatus, tsModule[module].name);
|
dInfo("module status:%u is set, stop %s module", moduleStatus, tsModule[module].name);
|
||||||
tsModule[module].enable = false;
|
tsModule[module].enable = false;
|
||||||
dnodeUnSetModuleStatus(module);
|
dnodeUnSetModuleStatus(module);
|
||||||
(*tsModule[module].stopFp)();
|
(*tsModule[module].stopFp)();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dnodeCheckModules() {
|
||||||
|
if (tsModuleStatus & TSDB_MOD_MNODE) return;
|
||||||
|
|
||||||
|
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
||||||
|
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
||||||
|
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
|
||||||
|
if (node->nodeId == dnodeGetDnodeId()) {
|
||||||
|
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);;
|
||||||
|
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
|
||||||
|
dnodeProcessModuleStatus(moduleStatus);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
|
||||||
void dnodeGetMnodeEpSetForShell(void *epSet);
|
void dnodeGetMnodeEpSetForShell(void *epSet);
|
||||||
void * dnodeGetMnodeInfos();
|
void * dnodeGetMnodeInfos();
|
||||||
int32_t dnodeGetDnodeId();
|
int32_t dnodeGetDnodeId();
|
||||||
|
void dnodeCheckModules();
|
||||||
|
|
||||||
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||||
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
||||||
|
|
|
@ -103,6 +103,9 @@ typedef struct {
|
||||||
|
|
||||||
typedef void* tsync_h;
|
typedef void* tsync_h;
|
||||||
|
|
||||||
|
int32_t syncInit();
|
||||||
|
void syncCleanUp();
|
||||||
|
|
||||||
tsync_h syncStart(const SSyncInfo *);
|
tsync_h syncStart(const SSyncInfo *);
|
||||||
void syncStop(tsync_h shandle);
|
void syncStop(tsync_h shandle);
|
||||||
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
|
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
#include "tsync.h"
|
#include "tsync.h"
|
||||||
|
#include "ttimer.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
@ -88,6 +89,8 @@ typedef struct {
|
||||||
SSdbWriteWorker *writeWorker;
|
SSdbWriteWorker *writeWorker;
|
||||||
} SSdbWriteWorkerPool;
|
} SSdbWriteWorkerPool;
|
||||||
|
|
||||||
|
extern void * tsMnodeTmr;
|
||||||
|
static void * tsUpdateSyncTmr;
|
||||||
static SSdbObject tsSdbObj = {0};
|
static SSdbObject tsSdbObj = {0};
|
||||||
static taos_qset tsSdbWriteQset;
|
static taos_qset tsSdbWriteQset;
|
||||||
static taos_qall tsSdbWriteQall;
|
static taos_qall tsSdbWriteQall;
|
||||||
|
@ -290,11 +293,16 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
||||||
taosFreeQitem(pOper);
|
taosFreeQitem(pOper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); }
|
||||||
|
|
||||||
void sdbUpdateSync() {
|
void sdbUpdateSync() {
|
||||||
if (!mnodeIsRunning()) {
|
if (!mnodeIsRunning()) {
|
||||||
mDebug("mnode not start yet, update sync info later");
|
mDebug("mnode not start yet, update sync info later");
|
||||||
|
dnodeCheckModules();
|
||||||
|
taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
mDebug("update sync info in sdb");
|
||||||
|
|
||||||
SSyncCfg syncCfg = {0};
|
SSyncCfg syncCfg = {0};
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
|
@ -387,8 +395,6 @@ int32_t sdbInit() {
|
||||||
tsSdbObj.role = TAOS_SYNC_ROLE_MASTER;
|
tsSdbObj.role = TAOS_SYNC_ROLE_MASTER;
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbUpdateSync();
|
|
||||||
|
|
||||||
tsSdbObj.status = SDB_STATUS_SERVING;
|
tsSdbObj.status = SDB_STATUS_SERVING;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,18 +35,14 @@ int tsSyncTcpThreads = 2;
|
||||||
int tsMaxWatchFiles = 500;
|
int tsMaxWatchFiles = 500;
|
||||||
int tsMaxFwdInfo = 200;
|
int tsMaxFwdInfo = 200;
|
||||||
int tsSyncTimer = 1;
|
int tsSyncTimer = 1;
|
||||||
//int sDebugFlag = 135;
|
|
||||||
//char tsArbitrator[TSDB_FQDN_LEN] = {0};
|
|
||||||
|
|
||||||
// module global, not configurable
|
// module global, not configurable
|
||||||
int tsSyncNum; // number of sync in process in whole system
|
int tsSyncNum; // number of sync in process in whole system
|
||||||
char tsNodeFqdn[TSDB_FQDN_LEN];
|
char tsNodeFqdn[TSDB_FQDN_LEN];
|
||||||
|
|
||||||
static int tsNodeNum; // number of nodes in system
|
|
||||||
static ttpool_h tsTcpPool;
|
static ttpool_h tsTcpPool;
|
||||||
static void *syncTmrCtrl = NULL;
|
static void * syncTmrCtrl = NULL;
|
||||||
static void *vgIdHash;
|
static void * vgIdHash;
|
||||||
static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT;
|
|
||||||
|
|
||||||
// local functions
|
// local functions
|
||||||
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
|
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
|
||||||
|
@ -75,7 +71,7 @@ char* syncRole[] = {
|
||||||
"master"
|
"master"
|
||||||
};
|
};
|
||||||
|
|
||||||
static void syncModuleInitFunc() {
|
int32_t syncInit() {
|
||||||
SPoolInfo info;
|
SPoolInfo info;
|
||||||
|
|
||||||
info.numOfThreads = tsSyncTcpThreads;
|
info.numOfThreads = tsSyncTcpThreads;
|
||||||
|
@ -87,25 +83,52 @@ static void syncModuleInitFunc() {
|
||||||
info.processIncomingConn = syncProcessIncommingConnection;
|
info.processIncomingConn = syncProcessIncommingConnection;
|
||||||
|
|
||||||
tsTcpPool = taosOpenTcpThreadPool(&info);
|
tsTcpPool = taosOpenTcpThreadPool(&info);
|
||||||
if (tsTcpPool == NULL) return;
|
if (tsTcpPool == NULL) {
|
||||||
|
sError("failed to init tcpPool");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
|
syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
|
||||||
if (syncTmrCtrl == NULL) {
|
if (syncTmrCtrl == NULL) {
|
||||||
|
sError("failed to init tmrCtrl");
|
||||||
taosCloseTcpThreadPool(tsTcpPool);
|
taosCloseTcpThreadPool(tsTcpPool);
|
||||||
tsTcpPool = NULL;
|
tsTcpPool = NULL;
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
|
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
|
||||||
if (vgIdHash == NULL) {
|
if (vgIdHash == NULL) {
|
||||||
|
sError("failed to init vgIdHash");
|
||||||
taosTmrCleanUp(syncTmrCtrl);
|
taosTmrCleanUp(syncTmrCtrl);
|
||||||
taosCloseTcpThreadPool(tsTcpPool);
|
taosCloseTcpThreadPool(tsTcpPool);
|
||||||
tsTcpPool = NULL;
|
tsTcpPool = NULL;
|
||||||
syncTmrCtrl = NULL;
|
syncTmrCtrl = NULL;
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
|
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
|
||||||
|
sInfo("sync module initialized successfully");
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncCleanUp() {
|
||||||
|
if (tsTcpPool) {
|
||||||
|
taosCloseTcpThreadPool(tsTcpPool);
|
||||||
|
tsTcpPool = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncTmrCtrl) {
|
||||||
|
taosTmrCleanUp(syncTmrCtrl);
|
||||||
|
syncTmrCtrl = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vgIdHash) {
|
||||||
|
taosHashCleanup(vgIdHash);
|
||||||
|
vgIdHash = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
sInfo("sync module is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
void *syncStart(const SSyncInfo *pInfo) {
|
void *syncStart(const SSyncInfo *pInfo) {
|
||||||
|
@ -118,15 +141,6 @@ void *syncStart(const SSyncInfo *pInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_once(&syncModuleInit, syncModuleInitFunc);
|
|
||||||
if (tsTcpPool == NULL) {
|
|
||||||
free(pNode);
|
|
||||||
syncModuleInit = PTHREAD_ONCE_INIT;
|
|
||||||
sError("failed to init sync module(%s)", tstrerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_add_fetch_32(&tsNodeNum, 1);
|
|
||||||
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
|
tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path));
|
||||||
pthread_mutex_init(&pNode->mutex, NULL);
|
pthread_mutex_init(&pNode->mutex, NULL);
|
||||||
|
|
||||||
|
@ -148,9 +162,10 @@ void *syncStart(const SSyncInfo *pInfo) {
|
||||||
for (int i = 0; i < pCfg->replica; ++i) {
|
for (int i = 0; i < pCfg->replica; ++i) {
|
||||||
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
|
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
|
||||||
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
|
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
|
||||||
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort))
|
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
|
||||||
pNode->selfIndex = i;
|
pNode->selfIndex = i;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pNode->selfIndex < 0) {
|
if (pNode->selfIndex < 0) {
|
||||||
sInfo("vgId:%d, this node is not configured", pNode->vgId);
|
sInfo("vgId:%d, this node is not configured", pNode->vgId);
|
||||||
|
@ -182,14 +197,15 @@ void *syncStart(const SSyncInfo *pInfo) {
|
||||||
syncAddNodeRef(pNode);
|
syncAddNodeRef(pNode);
|
||||||
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
|
taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *));
|
||||||
|
|
||||||
if (pNode->notifyRole)
|
if (pNode->notifyRole) {
|
||||||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||||
|
}
|
||||||
|
|
||||||
return pNode;
|
return pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncStop(void *param) {
|
void syncStop(void *param) {
|
||||||
SSyncNode * pNode = param;
|
SSyncNode *pNode = param;
|
||||||
SSyncPeer *pPeer;
|
SSyncPeer *pPeer;
|
||||||
|
|
||||||
if (pNode == NULL) return;
|
if (pNode == NULL) return;
|
||||||
|
@ -214,12 +230,12 @@ void syncStop(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
|
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
|
||||||
SSyncNode * pNode = param;
|
SSyncNode *pNode = param;
|
||||||
int i, j;
|
int i, j;
|
||||||
|
|
||||||
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
|
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
|
||||||
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole],
|
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
|
||||||
pNewCfg->replica, pNode->replica);
|
pNode->replica);
|
||||||
|
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
|
|
||||||
|
@ -252,17 +268,19 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
|
||||||
newPeers[i] = pNode->peerInfo[j];
|
newPeers[i] = pNode->peerInfo[j];
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort))
|
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
|
||||||
pNode->selfIndex = i;
|
pNode->selfIndex = i;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pNode->replica = pNewCfg->replica;
|
pNode->replica = pNewCfg->replica;
|
||||||
pNode->quorum = pNewCfg->quorum;
|
pNode->quorum = pNewCfg->quorum;
|
||||||
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
|
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
|
||||||
memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);
|
memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica);
|
||||||
|
|
||||||
for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i)
|
for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) {
|
||||||
pNode->peerInfo[i] = NULL;
|
pNode->peerInfo[i] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
syncAddArbitrator(pNode);
|
syncAddArbitrator(pNode);
|
||||||
|
|
||||||
|
@ -274,17 +292,18 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&(pNode->mutex));
|
pthread_mutex_unlock(&(pNode->mutex));
|
||||||
|
|
||||||
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]);
|
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
|
||||||
|
syncRole[nodeRole]);
|
||||||
syncBroadcastStatus(pNode);
|
syncBroadcastStatus(pNode);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
||||||
SSyncNode * pNode = param;
|
SSyncNode *pNode = param;
|
||||||
SSyncPeer * pPeer;
|
SSyncPeer *pPeer;
|
||||||
SSyncHead *pSyncHead;
|
SSyncHead *pSyncHead;
|
||||||
SWalHead *pWalHead = data;
|
SWalHead * pWalHead = data;
|
||||||
int fwdLen;
|
int fwdLen;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
|
@ -292,23 +311,23 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
||||||
|
|
||||||
// always update version
|
// always update version
|
||||||
nodeVersion = pWalHead->version;
|
nodeVersion = pWalHead->version;
|
||||||
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0;
|
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
||||||
|
|
||||||
// only pkt from RPC or CQ can be forwarded
|
// only pkt from RPC or CQ can be forwarded
|
||||||
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
|
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
|
||||||
|
|
||||||
// a hacker way to improve the performance
|
// a hacker way to improve the performance
|
||||||
pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead));
|
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
|
||||||
pSyncHead->type = TAOS_SMSG_FORWARD;
|
pSyncHead->type = TAOS_SMSG_FORWARD;
|
||||||
pSyncHead->pversion = 0;
|
pSyncHead->pversion = 0;
|
||||||
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
|
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
|
||||||
fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head
|
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
|
||||||
|
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
pPeer = pNode->peerInfo[i];
|
pPeer = pNode->peerInfo[i];
|
||||||
if (pPeer == NULL || pPeer->peerFd <0) continue;
|
if (pPeer == NULL || pPeer->peerFd < 0) continue;
|
||||||
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
|
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
|
||||||
|
|
||||||
if (pNode->quorum > 1 && code == 0) {
|
if (pNode->quorum > 1 && code == 0) {
|
||||||
|
@ -340,7 +359,7 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) {
|
||||||
|
|
||||||
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
|
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
|
||||||
|
|
||||||
SSyncHead *pHead = (SSyncHead *) msg;
|
SSyncHead *pHead = (SSyncHead *)msg;
|
||||||
pHead->type = TAOS_SMSG_FORWARD_RSP;
|
pHead->type = TAOS_SMSG_FORWARD_RSP;
|
||||||
pHead->len = sizeof(SFwdRsp);
|
pHead->len = sizeof(SFwdRsp);
|
||||||
|
|
||||||
|
@ -373,7 +392,7 @@ void syncRecover(void *param) {
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
pPeer = (SSyncPeer *) pNode->peerInfo[i];
|
pPeer = (SSyncPeer *)pNode->peerInfo[i];
|
||||||
if (pPeer->peerFd >= 0) {
|
if (pPeer->peerFd >= 0) {
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
}
|
}
|
||||||
|
@ -386,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
|
||||||
SSyncNode *pNode = param;
|
SSyncNode *pNode = param;
|
||||||
|
|
||||||
pNodesRole->selfIndex = pNode->selfIndex;
|
pNodesRole->selfIndex = pNode->selfIndex;
|
||||||
for (int i=0; i<pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
|
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
|
||||||
pNodesRole->role[i] = pNode->peerInfo[i]->role;
|
pNodesRole->role[i] = pNode->peerInfo[i]->role;
|
||||||
}
|
}
|
||||||
|
@ -423,29 +442,16 @@ static void syncAddArbitrator(SSyncNode *pNode) {
|
||||||
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
|
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncAddNodeRef(SSyncNode *pNode)
|
static void syncAddNodeRef(SSyncNode *pNode) {
|
||||||
{
|
|
||||||
atomic_add_fetch_8(&pNode->refCount, 1);
|
atomic_add_fetch_8(&pNode->refCount, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncDecNodeRef(SSyncNode *pNode)
|
static void syncDecNodeRef(SSyncNode *pNode) {
|
||||||
{
|
|
||||||
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
|
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
|
||||||
pthread_mutex_destroy(&pNode->mutex);
|
pthread_mutex_destroy(&pNode->mutex);
|
||||||
taosTFree(pNode->pRecv);
|
taosTFree(pNode->pRecv);
|
||||||
taosTFree(pNode->pSyncFwds);
|
taosTFree(pNode->pSyncFwds);
|
||||||
taosTFree(pNode);
|
taosTFree(pNode);
|
||||||
|
|
||||||
if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) {
|
|
||||||
if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool);
|
|
||||||
if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl);
|
|
||||||
if (vgIdHash) taosHashCleanup(vgIdHash);
|
|
||||||
syncTmrCtrl = NULL;
|
|
||||||
tsTcpPool = NULL;
|
|
||||||
vgIdHash = NULL;
|
|
||||||
syncModuleInit = PTHREAD_ONCE_INIT;
|
|
||||||
sDebug("sync module is cleaned up");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +493,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
||||||
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
|
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
|
||||||
if (ip == -1) return NULL;
|
if (ip == -1) return NULL;
|
||||||
|
|
||||||
SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer));
|
SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer));
|
||||||
if (pPeer == NULL) return NULL;
|
if (pPeer == NULL) return NULL;
|
||||||
|
|
||||||
pPeer->nodeId = pInfo->nodeId;
|
pPeer->nodeId = pInfo->nodeId;
|
||||||
|
@ -506,7 +512,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
||||||
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
||||||
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
||||||
sDebug("%s, start to check peer connection", pPeer->id);
|
sDebug("%s, start to check peer connection", pPeer->id);
|
||||||
taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer);
|
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
|
||||||
|
if (pNode->vgId) checkMs = tsStatusInterval * 2000 + 100;
|
||||||
|
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAddNodeRef(pNode);
|
syncAddNodeRef(pNode);
|
||||||
|
@ -542,18 +550,20 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
||||||
sDebug("vgId:%d, choose master", pNode->vgId);
|
sDebug("vgId:%d, choose master", pNode->vgId);
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
|
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
|
||||||
onlineNum++;
|
onlineNum++;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (onlineNum == pNode->replica) {
|
if (onlineNum == pNode->replica) {
|
||||||
// if all peers are online, peer with highest version shall be master
|
// if all peers are online, peer with highest version shall be master
|
||||||
index = 0;
|
index = 0;
|
||||||
for (int i = 1; i < pNode->replica; ++i) {
|
for (int i = 1; i < pNode->replica; ++i) {
|
||||||
if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version)
|
if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) {
|
||||||
index = i;
|
index = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add arbitrator connection
|
// add arbitrator connection
|
||||||
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
||||||
|
@ -568,11 +578,12 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
||||||
//slave with highest version shall be master
|
//slave with highest version shall be master
|
||||||
pPeer = pNode->peerInfo[i];
|
pPeer = pNode->peerInfo[i];
|
||||||
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
|
if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version)
|
if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) {
|
||||||
index = i;
|
index = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
if (index == pNode->selfIndex) {
|
if (index == pNode->selfIndex) {
|
||||||
|
@ -595,9 +606,10 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
|
||||||
int replica = pNode->replica;
|
int replica = pNode->replica;
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE)
|
if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) {
|
||||||
onlineNum++;
|
onlineNum++;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add arbitrator connection
|
// add arbitrator connection
|
||||||
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
||||||
|
@ -644,7 +656,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
||||||
for (int i = 0; i < pNode->replica; ++i) {
|
for (int i = 0; i < pNode->replica; ++i) {
|
||||||
if ( i == pNode->selfIndex ) continue;
|
if (i == pNode->selfIndex) continue;
|
||||||
syncRestartPeer(pNode->peerInfo[i]);
|
syncRestartPeer(pNode->peerInfo[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -661,12 +673,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
|
||||||
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
pNode->peerInfo[pNode->selfIndex]->version = nodeVersion;
|
||||||
pPeer->role = newRole;
|
pPeer->role = newRole;
|
||||||
|
|
||||||
sDebug("%s, own role:%s, new peer role:%s", pPeer->id,
|
sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]);
|
||||||
syncRole[nodeRole], syncRole[pPeer->role]);
|
|
||||||
|
|
||||||
SSyncPeer *pMaster = syncCheckMaster(pNode);
|
SSyncPeer *pMaster = syncCheckMaster(pNode);
|
||||||
|
|
||||||
if ( pMaster ) {
|
if (pMaster) {
|
||||||
// master is there
|
// master is there
|
||||||
pNode->pMaster = pMaster;
|
pNode->pMaster = pMaster;
|
||||||
sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version);
|
sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version);
|
||||||
|
@ -699,19 +710,22 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
|
||||||
if (pNode->replica == 2) consistent = 1;
|
if (pNode->replica == 2) consistent = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (consistent)
|
if (consistent) {
|
||||||
syncChooseMaster(pNode);
|
syncChooseMaster(pNode);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (syncRequired) {
|
if (syncRequired) {
|
||||||
syncRecoverFromMaster(pMaster);
|
syncRecoverFromMaster(pMaster);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (peerOldRole != newRole || nodeRole != selfOldRole)
|
if (peerOldRole != newRole || nodeRole != selfOldRole) {
|
||||||
syncBroadcastStatus(pNode);
|
syncBroadcastStatus(pNode);
|
||||||
|
}
|
||||||
|
|
||||||
if (nodeRole != TAOS_SYNC_ROLE_MASTER)
|
if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
|
||||||
syncResetFlowCtrl(pNode);
|
syncResetFlowCtrl(pNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncRestartPeer(SSyncPeer *pPeer) {
|
static void syncRestartPeer(SSyncPeer *pPeer) {
|
||||||
|
@ -722,8 +736,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
|
||||||
pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
|
pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
|
||||||
|
|
||||||
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
||||||
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort))
|
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
|
||||||
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRestartConnection(SSyncPeer *pPeer) {
|
void syncRestartConnection(SSyncPeer *pPeer) {
|
||||||
|
@ -805,7 +820,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("%s, try to sync", pPeer->id)
|
sDebug("%s, try to sync", pPeer->id);
|
||||||
|
|
||||||
SFirstPkt firstPkt;
|
SFirstPkt firstPkt;
|
||||||
memset(&firstPkt, 0, sizeof(firstPkt));
|
memset(&firstPkt, 0, sizeof(firstPkt));
|
||||||
|
@ -814,31 +829,29 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
|
||||||
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
|
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
|
|
||||||
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) {
|
if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
|
||||||
sError("%s, failed to send sync-req to peer", pPeer->id);
|
sError("%s, failed to send sync-req to peer", pPeer->id);
|
||||||
} else {
|
} else {
|
||||||
nodeSStatus = TAOS_SYNC_STATUS_START;
|
nodeSStatus = TAOS_SYNC_STATUS_START;
|
||||||
sInfo("%s, sync-req is sent", pPeer->id);
|
sInfo("%s, sync-req is sent", pPeer->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
|
SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
SFwdInfo *pFwdInfo;
|
SFwdInfo * pFwdInfo;
|
||||||
|
|
||||||
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
|
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
|
||||||
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
||||||
|
|
||||||
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
||||||
// find the forwardInfo from first
|
// find the forwardInfo from first
|
||||||
for (int i=0; i<pSyncFwds->fwds; ++i) {
|
for (int i = 0; i < pSyncFwds->fwds; ++i) {
|
||||||
pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo;
|
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
|
||||||
if (pFwdRsp->version == pFwdInfo->version) break;
|
if (pFwdRsp->version == pFwdInfo->version) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -848,13 +861,13 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
SWalHead *pHead = (SWalHead *)cont;
|
SWalHead * pHead = (SWalHead *)cont;
|
||||||
|
|
||||||
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
|
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||||
|
|
||||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||||
//nodeVersion = pHead->version;
|
// nodeVersion = pHead->version;
|
||||||
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
|
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD);
|
||||||
} else {
|
} else {
|
||||||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||||
|
@ -877,12 +890,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
|
||||||
pPeer->version = pPeersStatus->version;
|
pPeer->version = pPeersStatus->version;
|
||||||
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
|
syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role);
|
||||||
|
|
||||||
if (pPeersStatus->ack)
|
if (pPeersStatus->ack) {
|
||||||
syncSendPeersStatusMsgToPeer(pPeer, 0);
|
syncSendPeersStatusMsgToPeer(pPeer, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
||||||
if (pPeer->peerFd <0) return -1;
|
if (pPeer->peerFd < 0) return -1;
|
||||||
|
|
||||||
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
|
int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
|
||||||
if (hlen != sizeof(SSyncHead)) {
|
if (hlen != sizeof(SSyncHead)) {
|
||||||
|
@ -906,9 +920,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int syncProcessPeerMsg(void *param, void *buffer) {
|
static int syncProcessPeerMsg(void *param, void *buffer) {
|
||||||
SSyncPeer * pPeer = param;
|
SSyncPeer *pPeer = param;
|
||||||
SSyncHead head;
|
SSyncHead head;
|
||||||
char *cont = (char *)buffer;
|
char * cont = (char *)buffer;
|
||||||
|
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
|
@ -932,16 +946,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
|
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
|
||||||
|
|
||||||
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
char msg[statusMsgLen] = {0};
|
char msg[statusMsgLen] = {0};
|
||||||
|
|
||||||
if (pPeer->peerFd <0 || pPeer->ip ==0) return;
|
if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
|
||||||
|
|
||||||
SSyncHead *pHead = (SSyncHead *) msg;
|
SSyncHead * pHead = (SSyncHead *)msg;
|
||||||
SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead));
|
SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
|
||||||
|
|
||||||
pHead->type = TAOS_SMSG_STATUS;
|
pHead->type = TAOS_SMSG_STATUS;
|
||||||
pHead->len = statusMsgLen - sizeof(SSyncHead);
|
pHead->len = statusMsgLen - sizeof(SSyncHead);
|
||||||
|
@ -979,13 +993,13 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
|
||||||
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
||||||
if (connFd < 0) {
|
if (connFd < 0) {
|
||||||
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
|
sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno));
|
||||||
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFirstPkt firstPkt;
|
SFirstPkt firstPkt;
|
||||||
memset(&firstPkt, 0, sizeof(firstPkt));
|
memset(&firstPkt, 0, sizeof(firstPkt));
|
||||||
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0;
|
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
|
||||||
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
|
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
|
||||||
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
|
||||||
firstPkt.port = tsSyncPort;
|
firstPkt.port = tsSyncPort;
|
||||||
|
@ -1000,7 +1014,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
|
||||||
} else {
|
} else {
|
||||||
sDebug("try later");
|
sDebug("try later");
|
||||||
close(connFd);
|
close(connFd);
|
||||||
taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1065,8 +1079,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
||||||
SSyncPeer *pPeer;
|
SSyncPeer *pPeer;
|
||||||
for (i = 0; i < pNode->replica; ++i) {
|
for (i = 0; i < pNode->replica; ++i) {
|
||||||
pPeer = pNode->peerInfo[i];
|
pPeer = pNode->peerInfo[i];
|
||||||
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port))
|
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
|
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
|
||||||
|
@ -1091,8 +1104,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&(pNode->mutex));
|
pthread_mutex_unlock(&(pNode->mutex));
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncProcessBrokenLink(void *param) {
|
static void syncProcessBrokenLink(void *param) {
|
||||||
|
@ -1123,8 +1134,10 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
||||||
pSyncFwds->fwds--;
|
pSyncFwds->fwds--;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncFwds->fwds > 0)
|
if (pSyncFwds->fwds > 0) {
|
||||||
pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo;
|
pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo;
|
||||||
|
}
|
||||||
|
|
||||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
|
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
|
||||||
pFwdInfo->version = version;
|
pFwdInfo->version = version;
|
||||||
pFwdInfo->mhandle = mhandle;
|
pFwdInfo->mhandle = mhandle;
|
||||||
|
@ -1140,14 +1153,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
|
|
||||||
int fwds = pSyncFwds->fwds;
|
int fwds = pSyncFwds->fwds;
|
||||||
for (int i=0; i<fwds; ++i) {
|
for (int i = 0; i < fwds; ++i) {
|
||||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
|
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
|
||||||
if (pFwdInfo->confirmed == 0) break;
|
if (pFwdInfo->confirmed == 0) break;
|
||||||
|
|
||||||
pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo;
|
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
|
||||||
pSyncFwds->fwds--;
|
pSyncFwds->fwds--;
|
||||||
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
|
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
|
||||||
//sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
|
// sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d",
|
||||||
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
|
// pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
|
||||||
memset(pFwdInfo, 0, sizeof(SFwdInfo));
|
memset(pFwdInfo, 0, sizeof(SFwdInfo));
|
||||||
}
|
}
|
||||||
|
@ -1159,13 +1172,15 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pFwdInfo->acks++;
|
pFwdInfo->acks++;
|
||||||
if (pFwdInfo->acks >= pNode->quorum-1)
|
if (pFwdInfo->acks >= pNode->quorum - 1) {
|
||||||
confirm = 1;
|
confirm = 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pFwdInfo->nacks++;
|
pFwdInfo->nacks++;
|
||||||
if (pFwdInfo->nacks > pNode->replica-pNode->quorum)
|
if (pFwdInfo->nacks > pNode->replica - pNode->quorum) {
|
||||||
confirm = 1;
|
confirm = 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (confirm && pFwdInfo->confirmed == 0) {
|
if (confirm && pFwdInfo->confirmed == 0) {
|
||||||
sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
|
sDebug("vgId:%d, forward is confirmed, ver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
|
||||||
|
@ -1181,8 +1196,8 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
||||||
|
|
||||||
if (pSyncFwds->fwds > 0) {
|
if (pSyncFwds->fwds > 0) {
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
for (int i=0; i<pSyncFwds->fwds; ++i) {
|
for (int i = 0; i < pSyncFwds->fwds; ++i) {
|
||||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo;
|
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
|
||||||
if (time - pFwdInfo->time < 2000) break;
|
if (time - pFwdInfo->time < 2000) break;
|
||||||
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t vnodeInitResources() {
|
int32_t vnodeInitResources() {
|
||||||
|
int code = syncInit();
|
||||||
|
if (code != 0) return code;
|
||||||
|
|
||||||
vnodeInitWriteFp();
|
vnodeInitWriteFp();
|
||||||
vnodeInitReadFp();
|
vnodeInitReadFp();
|
||||||
|
|
||||||
|
@ -70,11 +73,12 @@ int32_t vnodeInitResources() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeCleanupResources() {
|
void vnodeCleanupResources() {
|
||||||
|
|
||||||
if (tsDnodeVnodesHash != NULL) {
|
if (tsDnodeVnodesHash != NULL) {
|
||||||
taosHashCleanup(tsDnodeVnodesHash);
|
taosHashCleanup(tsDnodeVnodesHash);
|
||||||
tsDnodeVnodesHash = NULL;
|
tsDnodeVnodesHash = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncCleanUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
|
@ -105,6 +105,15 @@ if $dnode4Vnodes != null then
|
||||||
goto show1
|
goto show1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step2
|
print ============================== step2
|
||||||
print ========= start dnode4
|
print ========= start dnode4
|
||||||
sql create dnode $hostname4
|
sql create dnode $hostname4
|
||||||
|
@ -132,6 +141,15 @@ if $dnode4Vnodes != 2 then
|
||||||
goto show2
|
goto show2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step3
|
print ============================== step3
|
||||||
print ========= drop dnode2
|
print ========= drop dnode2
|
||||||
sql drop dnode $hostname2
|
sql drop dnode $hostname2
|
||||||
|
@ -167,6 +185,15 @@ if $dnode4Vnodes != 3 then
|
||||||
goto show3
|
goto show3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
|
||||||
print ============================== step4
|
print ============================== step4
|
||||||
|
@ -195,6 +222,15 @@ if $dnode5Vnodes != 2 then
|
||||||
goto show4
|
goto show4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step5
|
print ============================== step5
|
||||||
print ========= drop dnode3
|
print ========= drop dnode3
|
||||||
sql drop dnode $hostname3
|
sql drop dnode $hostname3
|
||||||
|
@ -232,6 +268,15 @@ endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step6
|
print ============================== step6
|
||||||
sql create dnode $hostname6
|
sql create dnode $hostname6
|
||||||
system sh/exec.sh -n dnode6 -s start
|
system sh/exec.sh -n dnode6 -s start
|
||||||
|
@ -258,6 +303,15 @@ if $dnode6Vnodes != 2 then
|
||||||
goto show6
|
goto show6
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step7
|
print ============================== step7
|
||||||
print ========= drop dnode4
|
print ========= drop dnode4
|
||||||
sql drop dnode $hostname4
|
sql drop dnode $hostname4
|
||||||
|
@ -294,6 +348,14 @@ if $dnode4Vnodes != null then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step8
|
print ============================== step8
|
||||||
sql create dnode $hostname7
|
sql create dnode $hostname7
|
||||||
|
@ -321,6 +383,15 @@ if $dnode7Vnodes != 2 then
|
||||||
goto show8
|
goto show8
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step9
|
print ============================== step9
|
||||||
print ========= drop dnode1
|
print ========= drop dnode1
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
@ -335,15 +406,20 @@ sql show mnodes
|
||||||
$dnode1Role = $data2_1
|
$dnode1Role = $data2_1
|
||||||
$dnode4Role = $data2_4
|
$dnode4Role = $data2_4
|
||||||
$dnode5Role = $data2_5
|
$dnode5Role = $data2_5
|
||||||
print dnode1 ==> $dnode1Role
|
print dnode1 ==> $data2_1
|
||||||
print dnode4 ==> $dnode4Role
|
print dnode2 ==> $data2_2
|
||||||
print dnode5 ==> $dnode5Role
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
if $dnode1Role != offline then
|
if $dnode1Role != offline then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print ============================== step9.1
|
print ============================== step9.1
|
||||||
|
sleep 2000
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
$x = 0
|
$x = 0
|
||||||
|
@ -353,6 +429,19 @@ show9:
|
||||||
if $x == 20 then
|
if $x == 20 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
$dnode1Role = $data2_1
|
||||||
|
$dnode4Role = $data2_4
|
||||||
|
$dnode5Role = $data2_5
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
sql show dnodes -x show9
|
sql show dnodes -x show9
|
||||||
$dnode5Vnodes = $data2_5
|
$dnode5Vnodes = $data2_5
|
||||||
print dnode5 $dnode5Vnodes
|
print dnode5 $dnode5Vnodes
|
||||||
|
@ -374,6 +463,15 @@ endi
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
sleep 5000
|
sleep 5000
|
||||||
|
|
||||||
|
sql show mnodes
|
||||||
|
print dnode1 ==> $data2_1
|
||||||
|
print dnode2 ==> $data2_2
|
||||||
|
print dnode3 ==> $data2_3
|
||||||
|
print dnode4 ==> $data2_4
|
||||||
|
print dnode5 ==> $data2_5
|
||||||
|
print dnode6 ==> $data2_6
|
||||||
|
print dnode7 ==> $data2_7
|
||||||
|
|
||||||
print ============================== step11
|
print ============================== step11
|
||||||
print ========= add db4
|
print ========= add db4
|
||||||
|
|
||||||
|
|
|
@ -667,7 +667,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
|
||||||
|
|
||||||
TAOS_RES* pSql = NULL;
|
TAOS_RES* pSql = NULL;
|
||||||
|
|
||||||
for (int attempt = 0; attempt < 3; ++attempt) {
|
for (int attempt = 0; attempt < 10; ++attempt) {
|
||||||
simLogSql(rest, false);
|
simLogSql(rest, false);
|
||||||
pSql = taos_query(script->taos, rest);
|
pSql = taos_query(script->taos, rest);
|
||||||
ret = taos_errno(pSql);
|
ret = taos_errno(pSql);
|
||||||
|
|
Loading…
Reference in New Issue