From cca2bcdb280275be2daede2ef139cd8a31e26080 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 13:09:08 +0800 Subject: [PATCH 1/7] enh: rpc set default epset --- include/libs/transport/trpc.h | 1 + source/libs/transport/inc/transComm.h | 7 ++-- source/libs/transport/src/trans.c | 4 +++ source/libs/transport/src/transCli.c | 47 +++++++++++++++++++++++++-- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 754a203471..752a0adc5b 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,6 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst); #ifdef __cplusplus } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 30f799f39e..654bfa7158 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -95,8 +95,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -155,7 +155,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release, Register } STransMsgType; +typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -231,6 +231,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); +void transSetDefaultEpSet(void* shandle, const SEpSet* dst); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 9e71c87fa5..2e47eb493a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -148,6 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); } +void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) { + // later + transSetDefaultEpSet(thandle, dst); +} int32_t rpcInit() { // impl later diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 92c5e9faf7..9d43265b80 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,7 +63,11 @@ typedef struct SCliThrdObj { SDelayQueue* delayQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // - bool quit; + + bool useDefaultEpSet; + SEpSet defaultEpSet; + + bool quit; } SCliThrdObj; typedef struct SCliObj { @@ -116,7 +120,9 @@ static void cliHandleExcept(SCliConn* conn); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, + cliHandleUpdate}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -683,6 +689,15 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { transUnrefCliHandle(conn); } } +static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + + pThrd->useDefaultEpSet = true; + pThrd->defaultEpSet = pCtx->epSet; + + tsem_post(pCtx->pSem); + destroyCmsg(pMsg); +} SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; @@ -712,6 +727,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; + if (pThrd->useDefaultEpSet) { + pCtx->epSet = pThrd->defaultEpSet; + } + SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; @@ -1067,4 +1086,28 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(pSem); } +void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { + STrans* pTransInst = ahandle; + for (int i = 0; i < pTransInst->numOfThreads; i++) { + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->hThrdIdx = i; + pCtx->epSet = *dst; + pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); + tsem_init(pCtx->pSem, 0, 0); + + SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->type = Update; + + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; + tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); + + tsem_t* pSem = pCtx->pSem; + transSendAsync(thrd->asyncPool, &(cliMsg->q)); + + tsem_wait(pSem); + tsem_destroy(pSem); + taosMemoryFree(pSem); + } +} #endif From f75bcdf4b246305d4854025d8b5e0f9a9a3fd649 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 13:19:21 +0800 Subject: [PATCH 2/7] enh: set useDefaultEpset --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9d43265b80..efe777a74e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -860,7 +860,7 @@ static SCliThrdObj* createThrdObj() { pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); - + pThrd->useDefaultEpSet = false; transDQCreate(pThrd->loop, &pThrd->delayQueue); pThrd->quit = false; From c1a774c64feb9e7f7fd411254a4bb2a9981d031d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 25 May 2022 14:46:34 +0800 Subject: [PATCH 3/7] enh/defaultEpSet --- source/client/src/clientMsgHandler.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index dfce01dd63..33d0d9feb4 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -58,7 +58,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + if (connectRsp.dnodeNum == 1) { + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + rpcSetDefaultEpSet(pTscObj->pAppInfo->pTransporter, &epset); + } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } From 8acbb20fb2a75dc0c68cae7bdbb0145c0cf3b38e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 15:22:23 +0800 Subject: [PATCH 4/7] enh: set default epset --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSrv.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index efe777a74e..5a02e38acd 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -122,7 +122,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, - cliHandleUpdate}; + NULL, cliHandleUpdate}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 36f5cf9815..e21eb6ec80 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -146,7 +146,7 @@ static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister}; + uvHandleRegister, NULL}; static int32_t exHandlesMgt; From 5f8420345631146f7a3ac75118b017f8e6d69370 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 15:47:52 +0800 Subject: [PATCH 5/7] update default epset --- source/libs/transport/src/transCli.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5a02e38acd..3be37ff4cc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -695,7 +695,7 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->useDefaultEpSet = true; pThrd->defaultEpSet = pCtx->epSet; - tsem_post(pCtx->pSem); + // tsem_post(pCtx->pSem); destroyCmsg(pMsg); } @@ -1092,8 +1092,8 @@ void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->hThrdIdx = i; pCtx->epSet = *dst; - pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(pCtx->pSem, 0, 0); + // pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); + // tsem_init(pCtx->pSem, 0, 0); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; @@ -1104,10 +1104,10 @@ void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { tsem_t* pSem = pCtx->pSem; transSendAsync(thrd->asyncPool, &(cliMsg->q)); + // tsem_wait(pSem); - tsem_wait(pSem); - tsem_destroy(pSem); - taosMemoryFree(pSem); + // tsem_destroy(pSem); + // taosMemoryFree(pSem); } } #endif From 511a4c3a4f3c450bee67b5409f440e6441ec9b6e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 May 2022 11:29:31 +0800 Subject: [PATCH 6/7] update default epset --- include/libs/transport/trpc.h | 2 +- source/client/src/clientMsgHandler.c | 13 +++++--- source/libs/transport/inc/transComm.h | 10 +++++- source/libs/transport/src/trans.c | 5 +-- source/libs/transport/src/transCli.c | 48 ++++++++++++++++----------- 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 752a0adc5b..02cc78fa81 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,7 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst); +void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); #ifdef __cplusplus } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 33d0d9feb4..f15315fe60 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -59,8 +59,10 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { } if (connectRsp.dnodeNum == 1) { - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - rpcSetDefaultEpSet(pTscObj->pAppInfo->pTransporter, &epset); + SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + SEpSet dstEpSet = connectRsp.epSet; + rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, + dstEpSet.eps[dstEpSet.inUse].fqdn); } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } @@ -129,9 +131,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (usedbRsp.vgVersion >= 0) { uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId; - int32_t code1 = catalogGetHandle(clusterId, &pCatalog); + int32_t code1 = catalogGetHandle(clusterId, &pCatalog); if (code1 != TSDB_CODE_SUCCESS) { - tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); + tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, + tstrerror(code1)); } else { catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); } @@ -161,7 +164,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); taosMemoryFreeClear(output.dbVgroup); - tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr()); + tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup) { struct SCatalog* pCatalog = NULL; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 654bfa7158..7a4c44fe2e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -104,6 +104,13 @@ typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; +/*convet from fqdn to ip */ +typedef struct SCvtAddr { + char ip[TSDB_FQDN_LEN]; + char fqdn[TSDB_FQDN_LEN]; + bool cvt; +} SCvtAddr; + typedef struct { SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app @@ -115,6 +122,7 @@ typedef struct { STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API + SCvtAddr cvtAddr; int hThrdIdx; } STransConnCtx; @@ -231,7 +239,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); -void transSetDefaultEpSet(void* shandle, const SEpSet* dst); +void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 2e47eb493a..d8b2ca8e0c 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -148,9 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); } -void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) { + +void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later - transSetDefaultEpSet(thandle, dst); + transSetDefaultAddr(thandle, ip, fqdn); } int32_t rpcInit() { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3be37ff4cc..3abaa625f6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -64,8 +64,7 @@ typedef struct SCliThrdObj { uint64_t nextTimeout; // next timeout void* pTransInst; // - bool useDefaultEpSet; - SEpSet defaultEpSet; + SCvtAddr cvtAddr; bool quit; } SCliThrdObj; @@ -107,6 +106,7 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn); +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); /* * set TCP connection timeout per-socket level */ @@ -692,10 +692,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; - pThrd->useDefaultEpSet = true; - pThrd->defaultEpSet = pCtx->epSet; - - // tsem_post(pCtx->pSem); + pThrd->cvtAddr = pCtx->cvtAddr; destroyCmsg(pMsg); } @@ -717,7 +714,17 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } return conn; } - +void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) { + if (pCvtAddr->cvt == false) { + return; + } + for (int i = 0; i < pEpSet->numOfEps && pEpSet->numOfEps == 1; i++) { + if (strncmp(pEpSet->eps[i].fqdn, pCvtAddr->fqdn, TSDB_FQDN_LEN) == 0) { + memset(pEpSet->eps[i].fqdn, 0, TSDB_FQDN_LEN); + memcpy(pEpSet->eps[i].fqdn, pCvtAddr->ip, TSDB_FQDN_LEN); + } + } +} void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -727,9 +734,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; - if (pThrd->useDefaultEpSet) { - pCtx->epSet = pThrd->defaultEpSet; - } + cliMayCvtFqdnToIp(&pCtx->epSet, &pCtx->cvtAddr); SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { @@ -860,7 +865,6 @@ static SCliThrdObj* createThrdObj() { pThrd->timer.data = pThrd; pThrd->pool = createConnPool(4); - pThrd->useDefaultEpSet = false; transDQCreate(pThrd->loop, &pThrd->delayQueue); pThrd->quit = false; @@ -1086,28 +1090,32 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(pSem); } -void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) { +/* + * + **/ +void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) { STrans* pTransInst = ahandle; + + SCvtAddr cvtAddr = {0}; + if (ip != NULL && fqdn != NULL) { + memcpy(cvtAddr.ip, ip, strlen(ip)); + memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); + cvtAddr.cvt = true; + } for (int i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->hThrdIdx = i; - pCtx->epSet = *dst; - // pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); - // tsem_init(pCtx->pSem, 0, 0); + pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->type = Update; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; - tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); + tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread); tsem_t* pSem = pCtx->pSem; transSendAsync(thrd->asyncPool, &(cliMsg->q)); - // tsem_wait(pSem); - - // tsem_destroy(pSem); - // taosMemoryFree(pSem); } } #endif From d0213972a244825733aa4f93c536aab9f4ed0bec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 May 2022 11:46:26 +0800 Subject: [PATCH 7/7] update default epset --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3abaa625f6..2d8ca7cead 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -734,7 +734,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; - cliMayCvtFqdnToIp(&pCtx->epSet, &pCtx->cvtAddr); + cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) {