This commit is contained in:
Shengliang Guan 2020-12-17 16:02:13 +08:00
parent 8d110d84ef
commit 4069969a89
3 changed files with 27 additions and 32 deletions

View File

@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
//global configurable parameters //global configurable parameters
extern int32_t tsMaxSyncNum;
extern int32_t tsSyncTcpThreads;
extern int32_t tsSyncTimer;
extern int32_t tsMaxFwdInfo;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
extern char tsArbitrator[]; extern char tsArbitrator[];
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;

View File

@ -29,11 +29,17 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define SYNC_TCP_THREADS 2
#define SYNC_MAX_NUM 2
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000 #define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 #define SYNC_CHECK_INTERVAL 1 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version

View File

@ -29,19 +29,12 @@
#include "syncTcp.h" #include "syncTcp.h"
#include "syncInt.h" #include "syncInt.h"
// global configurable int32_t tsSyncNum = 0; // number of sync in process in whole system
int32_t tsMaxSyncNum = 2; char tsNodeFqdn[TSDB_FQDN_LEN] = {0};
int32_t tsSyncTcpThreads = 2;
int32_t tsMaxFwdInfo = 512;
int32_t tsSyncTimer = 1;
// module global, not configurable static void * tsTcpPool = NULL;
int32_t tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN];
static void * tsTcpPool;
static void * tsSyncTmrCtrl = NULL; static void * tsSyncTmrCtrl = NULL;
static void * tsVgIdHash; static void * tsVgIdHash = NULL;
static int32_t tsSyncRefId = -1; static int32_t tsSyncRefId = -1;
// local functions // local functions
@ -83,7 +76,7 @@ char *syncStatus[] = {
int32_t syncInit() { int32_t syncInit() {
SPoolInfo info = {0}; SPoolInfo info = {0};
info.numOfThreads = tsSyncTcpThreads; info.numOfThreads = SYNC_TCP_THREADS;
info.serverIp = 0; info.serverIp = 0;
info.port = tsSyncPort; info.port = tsSyncPort;
info.bufferSize = SYNC_MAX_SIZE; info.bufferSize = SYNC_MAX_SIZE;
@ -107,7 +100,7 @@ int32_t syncInit() {
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) { if (tsVgIdHash == NULL) {
sError("failed to init tsVgIdHash"); sError("failed to init vgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
syncCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
@ -210,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]); syncRole[nodeRole]);
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -750,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id); sDebug("%s, check peer connection in 1000 ms", pPeer->id);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -828,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
// Ensure the sync of mnode not interrupted // Ensure the sync of mnode not interrupted
if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
@ -839,7 +832,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
SSyncMsg msg; SSyncMsg msg;
syncBuildSyncReqMsg(&msg, pNode->vgId); syncBuildSyncReqMsg(&msg, pNode->vgId);
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
@ -859,7 +852,7 @@ static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first // find the forwardInfo from first
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS;
if (pFwdRsp->version == pFwdInfo->version) { if (pFwdRsp->version == pFwdInfo->version) {
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode); syncRemoveConfirmedFwdInfo(pNode);
@ -995,7 +988,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
@ -1011,7 +1004,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd); taosClose(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -1140,15 +1133,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) { if (pSyncFwds->fwds >= SYNC_MAX_FWDS) {
// pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
// pSyncFwds->fwds--; // pSyncFwds->fwds--;
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds); sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
return TSDB_CODE_SYN_TOO_MANY_FWDINFO; return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
} }
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS;
} }
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
@ -1171,7 +1164,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break; if (pFwdInfo->confirmed == 0) break;
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
pSyncFwds->fwds--; pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
@ -1237,7 +1230,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break; if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,