diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8b7311fe3b..d20b992315 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -137,10 +137,19 @@ typedef struct SCvtAddr { } SCvtAddr; typedef struct { - SEpSet epSet; // ip list provided by app - SEpSet origEpSet; - void* ahandle; // handle provided by app - tmsg_t msgType; // message type + int32_t inUse; + int32_t numOfEps; + SEp eps[]; +} SReqEpSet; + +int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet); +int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet); + +typedef struct { + SReqEpSet* epSet; // ip list provided by app + SReqEpSet* origEpSet; + void* ahandle; // handle provided by app + tmsg_t msgType; // message type STransCtx userCtx; // STransMsg* pRsp; // for synchronous API @@ -438,9 +447,9 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); void transDQCancel(SDelayQueue* queue, SDelayTask* task); -bool transEpSetIsEqual(SEpSet* a, SEpSet* b); +bool transRepEpsetIsEqual(SReqEpSet* a, SReqEpSet* b); -bool transEpSetIsEqual2(SEpSet* a, SEpSet* b); +bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b); /* * init global func */ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fe3e45ec8c..3b40b3931f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -230,7 +230,7 @@ static void doFreeTimeoutMsg(void* param); static void cliDestroyBatch(SCliBatch* pBatch); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx); -static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); +static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SReqEpSet* pEpSet, const SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* thrd, SCliReq* pReq, STransMsg* resp); @@ -831,8 +831,8 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p // code static int32_t cliGetOrCreateConn(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { // impl later - char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + char* fqdn = EPSET_GET_INUSE_IP(pReq->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); @@ -932,8 +932,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { int32_t code = 0; SCliConn* pConn = NULL; - char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + char* ip = EPSET_GET_INUSE_IP(pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet); TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); @@ -1559,7 +1559,7 @@ static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; } -FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { +FORCE_INLINE int32_t cliMayCvtFqdnToIp(SReqEpSet* pEpSet, const SCvtAddr* pCvtAddr) { if (pCvtAddr == NULL) { return 0; } @@ -1583,7 +1583,7 @@ FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) { if (code != 0) return false; - return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; + return transRepEpsetIsEqual(pCtx->epSet, pCtx->origEpSet) ? false : true; } FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMsg* pResp) { @@ -1770,8 +1770,8 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { char addr[TSDB_FQDN_LEN + 64] = {0}; - char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); - int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + char* ip = EPSET_GET_INUSE_IP(pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet); CONN_CONSTRUCT_HASH_KEY(addr, ip, port); pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); @@ -1860,8 +1860,8 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { STrans* pInst = pThrd->pInst; SReqCtx* pCtx = pReq->ctx; - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char* ip = EPSET_GET_INUSE_IP(pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); size_t klen = strlen(key); @@ -2007,7 +2007,9 @@ static FORCE_INLINE void destroyReq(void* arg) { STraceId* trace = &pReq->msg.info.traceId; tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); - if (pReq->ctx) destroyReqCtx(pReq->ctx); + if (pReq->ctx) { + destroyReqCtx(pReq->ctx); + } transFreeMsg(pReq->msg.pCont); taosMemoryFree(pReq); } @@ -2102,7 +2104,7 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { if (pReq->ctx == NULL) { return 0; } - return cliMayCvtFqdnToIp(&pReq->ctx->epSet, pThrd->pCvtAddr); + return cliMayCvtFqdnToIp(pReq->ctx->epSet, pThrd->pCvtAddr); } int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = thrd; @@ -2319,7 +2321,13 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd); } -static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx) { taosMemoryFree(ctx); } +static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx) { + if (ctx) { + taosMemoryFree(ctx->epSet); + taosMemoryFree(ctx->origEpSet); + taosMemoryFree(ctx); + } +} int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully @@ -2391,7 +2399,7 @@ static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) { STraceId* trace = &pReq->msg.info.traceId; char tbuf[512] = {0}; - code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + code = epsetToStr((SEpSet*)pCtx->epSet, tbuf, tListLen(tbuf)); if (code != 0) { tWarn("failed to debug epset since %s", tstrerror(code)); return; @@ -2410,7 +2418,7 @@ static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) { char tbuf[512] = {0}; - code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + code = epsetToStr((SEpSet*)pCtx->epSet, tbuf, tListLen(tbuf)); if (code != 0) { tWarn("failed to debug epset since %s", tstrerror(code)); return; @@ -2442,7 +2450,6 @@ static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliReq* pReq, SCliThrd* pThrd FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { int32_t code = 0; SReqCtx* ctx = pReq->ctx; - SEpSet* dst = &ctx->epSet; if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; @@ -2453,7 +2460,8 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { tError("failed to deserialize epset, code:%d", code); return false; } - int32_t tlen = tSerializeSEpSet(NULL, 0, dst); + SEpSet tepset; + int32_t tlen = tSerializeSEpSet(NULL, 0, &tepset); char* buf = NULL; int32_t len = pResp->contLen - tlen; @@ -2473,7 +2481,7 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { pResp->info.hasEpSet = 1; - epsetAssign(dst, &epset); + transCreateReqEpsetFromUserEpset(&epset, &ctx->epSet); return true; } @@ -2481,34 +2489,35 @@ bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) { bool noDelay = true; if (hasEpSet == false) { if (pResp->contLen == 0) { - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) { noDelay = false; } else { - EPSET_FORWARD_INUSE(&pCtx->epSet); + EPSET_FORWARD_INUSE(pCtx->epSet); } } else if (pResp->contLen != 0) { SEpSet epSet; int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); if (valid < 0) { tDebug("get invalid epset, epset equal, continue"); - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) { noDelay = false; } else { - EPSET_FORWARD_INUSE(&pCtx->epSet); + EPSET_FORWARD_INUSE(pCtx->epSet); } } else { - if (!transEpSetIsEqual2(&pCtx->epSet, &epSet)) { + if (!transCompareReqAndUserEpset(pCtx->epSet, &epSet)) { tDebug("epset not equal, retry new epset1"); - transPrintEpSet(&pCtx->epSet); + transPrintEpSet((SEpSet*)pCtx->epSet); transPrintEpSet(&epSet); - epsetAssign(&pCtx->epSet, &epSet); + + transCreateReqEpsetFromUserEpset(&epSet, &pCtx->epSet); noDelay = false; } else { - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) { noDelay = false; } else { tDebug("epset equal, continue"); - EPSET_FORWARD_INUSE(&pCtx->epSet); + EPSET_FORWARD_INUSE(pCtx->epSet); } } } @@ -2518,24 +2527,24 @@ bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) { int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); if (valid < 0) { tDebug("get invalid epset, epset equal, continue"); - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) { noDelay = false; } else { - EPSET_FORWARD_INUSE(&pCtx->epSet); + EPSET_FORWARD_INUSE(pCtx->epSet); } } else { - if (!transEpSetIsEqual2(&pCtx->epSet, &epSet)) { + if (!transCompareReqAndUserEpset(pCtx->epSet, &epSet)) { tDebug("epset not equal, retry new epset2"); - transPrintEpSet(&pCtx->epSet); + transPrintEpSet((SEpSet*)pCtx->epSet); transPrintEpSet(&epSet); - epsetAssign(&pCtx->epSet, &epSet); + transCreateReqEpsetFromUserEpset(&epSet, &pCtx->epSet); noDelay = false; } else { - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) { noDelay = false; } else { tDebug("epset equal, continue"); - EPSET_FORWARD_INUSE(&pCtx->epSet); + EPSET_FORWARD_INUSE(pCtx->epSet); } } } @@ -2675,7 +2684,7 @@ void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) { } // check whole vnodes is offline on this vgroup - if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) { + if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps || pCtx->retryStep > 0) { if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED; } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) { @@ -2710,7 +2719,10 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); if (cliIsEpsetUpdated(pResp->code, pCtx)) { pSyncMsg->hasEpSet = 1; - epsetAssign(&pSyncMsg->epSet, &pCtx->epSet); + + SEpSet epset = {0}; + transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset); + epsetAssign(&pSyncMsg->epSet, &epset); } TAOS_UNUSED(tsem2_post(pSyncMsg->pSem)); TAOS_UNUSED(taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef)); @@ -2721,12 +2733,16 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { } else { tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pResp->info.hasEpSet == 1) { - pInst->cfp(pInst->parent, pResp, &pCtx->epSet); + SEpSet epset = {0}; + transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset); + pInst->cfp(pInst->parent, pResp, &epset); } else { if (!cliIsEpsetUpdated(pResp->code, pCtx)) { pInst->cfp(pInst->parent, pResp, NULL); } else { - pInst->cfp(pInst->parent, pResp, &pCtx->epSet); + SEpSet epset = {0}; + transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset); + pInst->cfp(pInst->parent, pResp, &epset); } } } @@ -2862,6 +2878,7 @@ int32_t transReleaseCliHandle(void* handle) { } static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) { + int32_t code = 0; TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); @@ -2869,8 +2886,18 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe return terrno; } - epsetAssign(&pCtx->epSet, pEpSet); - epsetAssign(&pCtx->origEpSet, pEpSet); + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet); + if (code != 0) { + taosMemoryFree(pCtx); + return code; + } + + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet); + if (code != 0) { + taosMemoryFree(pCtx); + taosMemoryFree(pCtx->epSet); + return code; + } pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; @@ -2983,7 +3010,9 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr pReq->pCont = NULL; return TSDB_CODE_RPC_MODULE_QUIT; } - int32_t code = 0; + int32_t code = 0; + SCliReq* pCliReq = NULL; + SReqCtx* pCtx = NULL; STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransRsp == NULL) { @@ -3008,25 +3037,36 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr if (pReq->info.traceId.msgId == 0) TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); - SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); + pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); if (pCtx == NULL) { TAOS_UNUSED(tsem_destroy(sem)); taosMemoryFree(sem); TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); } - epsetAssign(&pCtx->epSet, pEpSet); - epsetAssign(&pCtx->origEpSet, pEpSet); + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet); + if (code != 0) { + (TAOS_UNUSED(tsem_destroy(sem))); + taosMemoryFree(sem); + TAOS_CHECK_GOTO(code, NULL, _RETURN1); + } + + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet); + if (code != 0) { + (TAOS_UNUSED(tsem_destroy(sem))); + taosMemoryFree(sem); + TAOS_CHECK_GOTO(code, NULL, _RETURN1); + } + pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->pSem = sem; pCtx->pRsp = pTransRsp; - SCliReq* pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); + pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); if (pCliReq == NULL) { (TAOS_UNUSED(tsem_destroy(sem))); taosMemoryFree(sem); - taosMemoryFree(pCtx); TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); } @@ -3037,7 +3077,7 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); + EPSET_GET_INUSE_IP(pCtx->epSet), EPSET_GET_INUSE_PORT(pCtx->epSet), pReq->info.ahandle); code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); if (code != 0) { @@ -3059,6 +3099,11 @@ _RETURN1: taosMemoryFree(pTransRsp); taosMemoryFree(pReq->pCont); pReq->pCont = NULL; + if (pCtx != NULL) { + taosMemoryFree(pCtx->epSet); + taosMemoryFree(pCtx->origEpSet); + taosMemoryFree(pCtx); + } return code; } @@ -3126,8 +3171,16 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq TAOS_CHECK_GOTO(terrno, NULL, _RETURN2); } - epsetAssign(&pCtx->epSet, pEpSet); - epsetAssign(&pCtx->origEpSet, pEpSet); + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet); + if (code != 0) { + taosMemoryFreeClear(pCtx->epSet); + TAOS_CHECK_GOTO(code, NULL, _RETURN2); + } + code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet); + if (code != 0) { + taosMemoryFreeClear(pCtx->epSet); + TAOS_CHECK_GOTO(code, NULL, _RETURN2); + } pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; @@ -3156,7 +3209,7 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); + EPSET_GET_INUSE_IP(pCtx->epSet), EPSET_GET_INUSE_PORT(pCtx->epSet), pReq->info.ahandle); code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); if (code != 0) { @@ -3183,6 +3236,12 @@ _RETURN: return code; _RETURN2: transFreeMsg(pReq->pCont); + + if (pCtx != NULL) { + taosMemoryFree(pCtx->epSet); + taosMemoryFree(pCtx->origEpSet); + taosMemoryFree(pCtx); + } pReq->pCont = NULL; taosMemoryFree(pTransMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); @@ -3440,7 +3499,7 @@ int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { int32_t totalReq1 = transQueueSize(&args1->reqsToSend) + transQueueSize(&args1->reqsSentOut); int32_t totalReq2 = transQueueSize(&args2->reqsToSend) + transQueueSize(&args2->reqsSentOut); - if ( totalReq1 > totalReq2) { + if (totalReq1 > totalReq2) { return 0; } return 1; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5669202238..7466cd9fbc 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -667,7 +667,7 @@ void transPrintEpSet(SEpSet* pEpSet) { len += snprintf(buf + len, sizeof(buf) - len, "}"); tTrace("%s, inUse:%d", buf, pEpSet->inUse); } -bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { +bool transRepEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) { if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) { return false; } @@ -678,7 +678,7 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } return true; } -bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) { +bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) { if (a->numOfEps != b->numOfEps) { return false; } @@ -934,4 +934,46 @@ int32_t transSetReadOption(uv_handle_t* handle) { } code = taosSetSockOpt2(fd); return code; +} + +int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) { + if (pEpset == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + if (pReqEpSet == NULL) { + return TSDB_CODE_INVALID_PARA; + } + taosMemoryFree(*pReqEpSet); + + int32_t size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps; + SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size); + if (pReq == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pReq->inUse = pEpset->inUse; + pReq->numOfEps = pEpset->numOfEps; + for (int32_t i = 0; i < pEpset->numOfEps; i++) { + pReq->eps[i].port = pEpset->eps[i].port; + strcpy(pReq->eps[i].fqdn, pEpset->eps[i].fqdn); + } + + *pReqEpSet = pReq; + return TSDB_CODE_SUCCESS; +} + +int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) { + if (pReqEpSet == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + pEpSet->inUse = pReqEpSet->inUse; + pEpSet->numOfEps = pReqEpSet->numOfEps; + for (int32_t i = 0; i < pReqEpSet->numOfEps; i++) { + pEpSet->eps[i].port = pReqEpSet->eps[i].port; + strcpy(pEpSet->eps[i].fqdn, pReqEpSet->eps[i].fqdn); + } + + return TSDB_CODE_SUCCESS; } \ No newline at end of file