diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d2e82d25f6..9db9550681 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -561,8 +561,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) stub.createTime = taosGetTimestampMs(); stub.rpcMsg = *pMsg; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - sDebug("vgId:%d sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), - seqNum, pMsg->info.handle); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); SRpcMsg rpcMsg; @@ -771,7 +769,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } // tools - pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); + pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, 0); assert(pSyncNode->pSyncRespMgr != NULL); // restore state diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 642e572434..5087cacd02 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -44,6 +44,10 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { uint64_t keyCode = ++(pObj->seqNum); taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); + SSyncNode *pSyncNode = pObj->data; + sDebug("vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle); + taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; } @@ -63,6 +67,11 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index)); if (pTmp != NULL) { memcpy(pStub, pTmp, sizeof(SRespStub)); + + SSyncNode *pSyncNode = pObj->data; + sDebug("vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object } @@ -76,6 +85,11 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index)); if (pTmp != NULL) { memcpy(pStub, pTmp, sizeof(SRespStub)); + + SSyncNode *pSyncNode = pObj->data; + sDebug("vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index d12c5058cc..4026596548 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -14,6 +14,8 @@ */ #include "syncUtil.h" +#include + #include "syncEnv.h" void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); @@ -21,8 +23,31 @@ void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); // ---- encode / decode uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t u64; + + uint32_t hostU32 = taosGetIpv4FromFqdn(host); + if (hostU32 == (uint32_t)-1) { + sError("Get IP address error"); + return -1; + } + + /* uint32_t hostU32 = (uint32_t)taosInetAddr(host); - // assert(hostU32 != (uint32_t)-1); + if (hostU32 == (uint32_t)-1) { + struct hostent* hostEnt = gethostbyname(host); + if (hostEnt == NULL) { + sError("Get IP address error"); + return -1; + } + + const char* newHost = taosInetNtoa(*(struct in_addr*)(hostEnt->h_addr_list[0])); + hostU32 = (uint32_t)taosInetAddr(newHost); + if (hostU32 == (uint32_t)-1) { + sError("change %s to id, error", newHost); + } + // ASSERT(hostU32 != (uint32_t)-1); + } + */ + u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); return u64; }