sync modify timer
This commit is contained in:
parent
aaf5e20fdc
commit
ee43a70c4d
|
@ -48,23 +48,23 @@ extern int32_t sDebugFlag;
|
||||||
taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \
|
taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
#define sInfo(...) \
|
#define sInfo(...) \
|
||||||
{ \
|
{ \
|
||||||
if (sDebugFlag & DEBUG_INFO) { \
|
if (sDebugFlag & DEBUG_INFO) { \
|
||||||
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
|
taosPrintLog("SYN INFO ", sDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
#define sDebug(...) \
|
#define sDebug(...) \
|
||||||
{ \
|
{ \
|
||||||
if (sDebugFlag & DEBUG_DEBUG) { \
|
if (sDebugFlag & DEBUG_DEBUG) { \
|
||||||
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
|
taosPrintLog("SYN DEBUG ", sDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
#define sTrace(...) \
|
#define sTrace(...) \
|
||||||
{ \
|
{ \
|
||||||
if (sDebugFlag & DEBUG_TRACE) { \
|
if (sDebugFlag & DEBUG_TRACE) { \
|
||||||
taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \
|
taosPrintLog("SYN TRACE ", sDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SRaft;
|
struct SRaft;
|
||||||
|
|
|
@ -72,6 +72,10 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
|
||||||
|
|
||||||
cJSON* syncPing2Json(const SyncPing* pMsg);
|
cJSON* syncPing2Json(const SyncPing* pMsg);
|
||||||
|
|
||||||
|
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
|
||||||
|
|
||||||
|
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
|
||||||
|
|
||||||
typedef struct SyncPingReply {
|
typedef struct SyncPingReply {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
uint32_t msgType;
|
uint32_t msgType;
|
||||||
|
|
|
@ -38,6 +38,7 @@ static void syncIOTick(void *param, void *tmrId);
|
||||||
// ----------------------------
|
// ----------------------------
|
||||||
|
|
||||||
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
|
sTrace("syncIOSendMsg ... ");
|
||||||
pMsg->handle = NULL;
|
pMsg->handle = NULL;
|
||||||
rpcSendRequest(handle, pEpSet, pMsg, NULL);
|
rpcSendRequest(handle, pEpSet, pMsg, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -74,7 +75,7 @@ static void syncIOTick(void *param, void *tmrId) {
|
||||||
|
|
||||||
taosWriteQitem(io->pMsgQ, pTemp);
|
taosWriteQitem(io->pMsgQ, pTemp);
|
||||||
|
|
||||||
taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer);
|
taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, &io->syncTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *syncIOConsumer(void *param) {
|
static void *syncIOConsumer(void *param) {
|
||||||
|
@ -191,7 +192,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
|
||||||
{
|
{
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 38000;
|
rpcInit.localPort = 7010;
|
||||||
rpcInit.label = "SYNC-IO-SERVER";
|
rpcInit.label = "SYNC-IO-SERVER";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = syncIODoRequest;
|
rpcInit.cfp = syncIODoRequest;
|
||||||
|
@ -209,7 +210,7 @@ static int32_t doSyncIOStart(SSyncIO *io) {
|
||||||
}
|
}
|
||||||
|
|
||||||
io->epSet.inUse = 0;
|
io->epSet.inUse = 0;
|
||||||
addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000);
|
addEpIntoEpSet(&io->epSet, "127.0.0.1", 7010);
|
||||||
|
|
||||||
// start consumer thread
|
// start consumer thread
|
||||||
{
|
{
|
||||||
|
@ -221,8 +222,8 @@ static int32_t doSyncIOStart(SSyncIO *io) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start tmr thread
|
// start tmr thread
|
||||||
// io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
|
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
|
||||||
// io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager);
|
io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,37 +113,36 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodePingAll(SSyncNode* pSyncNode) {
|
void syncNodePingAll(SSyncNode* pSyncNode) {
|
||||||
sTrace("syncNodePingAll %p ", pSyncNode);
|
sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
|
||||||
SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
|
SRaftId destId;
|
||||||
memcpy(pMsg->data, "ping", strlen("ping") + 1);
|
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &pMsg->destId);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
|
||||||
pMsg->srcId = pSyncNode->raftId;
|
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
||||||
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
|
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
syncPingDestroy(pMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodePingPeers(SSyncNode* pSyncNode) {
|
void syncNodePingPeers(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SyncPing* pSyncPing;
|
SRaftId destId;
|
||||||
SRaftId raftId;
|
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &raftId);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
|
||||||
ret = syncNodePing(pSyncNode, &raftId, pSyncPing);
|
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
syncPingDestroy(pMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodePingSelf(SSyncNode* pSyncNode) {
|
void syncNodePingSelf(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret;
|
||||||
SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
|
||||||
memcpy(pMsg->data, "ping", strlen("ping") + 1);
|
|
||||||
pMsg->destId = pSyncNode->raftId;
|
|
||||||
pMsg->srcId = pSyncNode->raftId;
|
|
||||||
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
|
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
syncPingDestroy(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
|
||||||
|
@ -167,10 +166,29 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
// ------ local funciton ---------
|
// ------ local funciton ---------
|
||||||
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
||||||
|
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncPing2RpcMsg(pMsg, &rpcMsg);
|
syncPing2RpcMsg(pMsg, &rpcMsg);
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
|
{
|
||||||
|
cJSON* pJson = syncPing2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
sTrace("syncNodePing pMsg:%s ", serialized);
|
||||||
|
free(serialized);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SyncPing* pMsg2 = rpcMsg.pCont;
|
||||||
|
cJSON* pJson = syncPing2Json(pMsg2);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
sTrace("syncNodePing pMsg2:%s ", serialized);
|
||||||
|
free(serialized);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +203,7 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
|
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
|
||||||
|
sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilraftId2EpSet(destRaftId, &epSet);
|
syncUtilraftId2EpSet(destRaftId, &epSet);
|
||||||
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
|
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
|
||||||
|
|
|
@ -80,6 +80,20 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
||||||
return pJson;
|
return pJson;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) {
|
||||||
|
uint32_t dataLen = strlen(str) + 1;
|
||||||
|
SyncPing* pMsg = syncPingBuild(dataLen);
|
||||||
|
pMsg->srcId = *srcId;
|
||||||
|
pMsg->destId = *destId;
|
||||||
|
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) {
|
||||||
|
SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping");
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
// ---- message process SyncPingReply----
|
// ---- message process SyncPingReply----
|
||||||
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
|
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
|
||||||
uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen;
|
uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen;
|
||||||
|
|
|
@ -25,7 +25,7 @@ SSyncNode* doSync() {
|
||||||
|
|
||||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
pCfg->myIndex = 0;
|
pCfg->myIndex = 0;
|
||||||
pCfg->replicaNum = 3;
|
pCfg->replicaNum = 1;
|
||||||
|
|
||||||
pCfg->nodeInfo[0].nodePort = 7010;
|
pCfg->nodeInfo[0].nodePort = 7010;
|
||||||
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
@ -63,6 +63,7 @@ int main() {
|
||||||
ret = syncEnvStart();
|
ret = syncEnvStart();
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
|
||||||
|
/*
|
||||||
SSyncNode* pSyncNode = doSync();
|
SSyncNode* pSyncNode = doSync();
|
||||||
|
|
||||||
ret = syncNodeStartPingTimer(pSyncNode);
|
ret = syncNodeStartPingTimer(pSyncNode);
|
||||||
|
@ -72,6 +73,7 @@ int main() {
|
||||||
|
|
||||||
ret = syncNodeStopPingTimer(pSyncNode);
|
ret = syncNodeStopPingTimer(pSyncNode);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
*/
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
|
|
Loading…
Reference in New Issue