sync modify timer
This commit is contained in:
parent
398d5ec3a7
commit
8057e44d17
|
@ -41,7 +41,8 @@ static void syncIOTickPingFunc(void *param, void *tmrId);
|
|||
|
||||
// public function ------------
|
||||
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
sTrace("syncIOSendMsg ... ");
|
||||
sTrace("<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u", clientRpc, pEpSet->numOfEps,
|
||||
pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port);
|
||||
pMsg->handle = NULL;
|
||||
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
|
||||
return 0;
|
||||
|
@ -54,6 +55,7 @@ int32_t syncIOStart(char *host, uint16_t port) {
|
|||
int32_t ret = syncIOStartInternal(gSyncIO);
|
||||
assert(ret == 0);
|
||||
|
||||
sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn
|
|||
SyncPing* pMsg2 = rpcMsg.pCont;
|
||||
cJSON* pJson = syncPing2Json(pMsg2);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
sTrace("syncNodePing pMsg2:%s ", serialized);
|
||||
sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
|
||||
free(serialized);
|
||||
cJSON_Delete(pJson);
|
||||
}
|
||||
|
@ -253,12 +253,16 @@ static void syncNodePingTimerCb(void* param, void* tmrId) {
|
|||
++(pSyncNode->pingTimerCounter);
|
||||
// pSyncNode->pingTimerMS += 100;
|
||||
|
||||
sTrace("pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, tmrId:%p ",
|
||||
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
|
||||
sTrace(
|
||||
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
|
||||
"tmrId:%p ",
|
||||
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
|
||||
|
||||
syncNodePingAll(pSyncNode);
|
||||
|
||||
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pPingTimer);
|
||||
|
||||
syncNodePingAll(pSyncNode);
|
||||
} else {
|
||||
sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
|
||||
}
|
||||
}
|
|
@ -70,7 +70,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
|||
cJSON* pDestId = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pDestId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", pMsg->data);
|
||||
|
@ -145,7 +145,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
|||
cJSON* pDestId = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pDestId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", pMsg->data);
|
||||
|
|
|
@ -51,11 +51,10 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
|
|||
pEpSet->numOfEps = 1;
|
||||
pEpSet->inUse = 0;
|
||||
pEpSet->eps[0].port = port;
|
||||
snprintf(epSet.eps[0].fqdn, sizeof(epSet.eps[0].fqdn), host);
|
||||
snprintf(pEpSet->eps[0].fqdn, sizeof(pEpSet->eps[0].fqdn), "%s", host);
|
||||
*/
|
||||
|
||||
pEpSet->inUse = 0;
|
||||
pEpSet->numOfEps = 1;
|
||||
pEpSet->numOfEps = 0;
|
||||
addEpIntoEpSet(pEpSet, host, port);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ int main() {
|
|||
SRpcMsg rpcMsg;
|
||||
rpcMsg.contLen = 64;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
|
||||
snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
|
||||
rpcMsg.handle = NULL;
|
||||
rpcMsg.msgType = 77;
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ int main() {
|
|||
SRpcMsg rpcMsg;
|
||||
rpcMsg.contLen = 64;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
|
||||
snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest");
|
||||
rpcMsg.handle = NULL;
|
||||
rpcMsg.msgType = 77;
|
||||
|
||||
|
|
|
@ -28,13 +28,16 @@ SSyncNode* doSync() {
|
|||
pCfg->replicaNum = 1;
|
||||
|
||||
pCfg->nodeInfo[0].nodePort = 7010;
|
||||
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
|
||||
pCfg->nodeInfo[1].nodePort = 7110;
|
||||
taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
|
||||
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
|
||||
|
||||
pCfg->nodeInfo[2].nodePort = 7210;
|
||||
taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
|
||||
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
|
||||
|
||||
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
|
Loading…
Reference in New Issue