From 188883341a5f16522119e3516e4bf2492ca1c1fe Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 31 Jul 2024 20:26:51 +0800 Subject: [PATCH] add new interface --- include/libs/transport/trpc.h | 21 ++-- source/libs/scheduler/src/schRemote.c | 23 ++-- source/libs/transport/inc/transComm.h | 31 ++--- source/libs/transport/src/trans.c | 24 ++-- source/libs/transport/src/transCli.c | 172 +++++++++++++++++++++----- source/libs/transport/src/transComm.c | 5 +- source/libs/transport/src/transSvr.c | 8 +- 7 files changed, 209 insertions(+), 75 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6c0d04354a..54d468a370 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -158,18 +158,21 @@ void *rpcReallocCont(void *ptr, int64_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side // Please use tmsg functions, which are defined in tmsgcb.h -int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -int rpcSendResponse(const SRpcMsg *pMsg); -int rpcRegisterBrokenLinkArg(SRpcMsg *msg); -int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock +int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +int32_t rpcSendResponse(const SRpcMsg *pMsg); +int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg); +int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, +int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, int32_t timeoutMs); -int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void *rpcAllocHandle(); + +int32_t rpcFreeConnById(void *shandle, int64_t connId); + +int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +int32_t rpcAllocHandle(int64_t *refId); int32_t rpcSetIpWhite(void *thandl, void *arg); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 08a8f684f5..9215254f9c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -17,11 +17,11 @@ #include "command.h" #include "query.h" #include "schInt.h" +#include "tglobal.h" +#include "tmisce.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" -#include "tglobal.h" -#include "tmisce.h" // clang-format off int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { @@ -975,11 +975,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); if (isHb && persistHandle && trans->pHandle == 0) { - trans->pHandle = rpcAllocHandle(); - if (NULL == trans->pHandle) { - SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno); - SCH_ERR_JRET(terrno); + int64_t refId = 0; + code = rpcAllocHandle(&refId); + if (code != 0) { + SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code); + SCH_ERR_JRET(code); } + trans->pHandle = (void *)refId; } if (pJob && pTask) { @@ -1200,7 +1202,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } persistHandle = true; - SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + int64_t refId = 0; + code = rpcAllocHandle(&refId); + if (code != 0) { + SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code); + SCH_ERR_JRET(code); + } + + SCH_SET_TASK_HANDLE(pTask, (void *)refId); break; } case TDMT_SCH_FETCH: diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e66941244c..60996398d4 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -207,7 +207,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; +typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -304,10 +304,10 @@ int32_t transClearBuffer(SConnBuffer* buf); int32_t transDestroyBuffer(SConnBuffer* buf); int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); +int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); -int transSetConnOption(uv_tcp_t* stream, int keepalive); +int32_t transSetConnOption(uv_tcp_t* stream, int keepalive); void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); @@ -315,21 +315,24 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); -int transReleaseCliHandle(void* handle); -int transReleaseSrvHandle(void* handle); +int32_t transReleaseCliHandle(void* handle); +int32_t transReleaseSrvHandle(void* handle); -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, +int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs); -int transSendResponse(const STransMsg* msg); -int transRegisterMsg(const STransMsg* msg); -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId); +int32_t transFreeConnById(void* shandle, int64_t transpointId); + +int32_t transSendResponse(const STransMsg* msg); +int32_t transRegisterMsg(const STransMsg* msg); +int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); -int transSockInfo2Str(struct sockaddr* sockname, char* dst); +int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst); -int64_t transAllocHandle(); +int32_t transAllocHandle(int64_t* refId); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index fbcc74e8e1..21c16afb28 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -163,38 +163,44 @@ void* rpcReallocCont(void* ptr, int64_t contLen) { return st + TRANS_MSG_OVERHEAD; } -int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { +int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { return transSendRequest(shandle, pEpSet, pMsg, NULL); } -int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { +int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { return transSendRequest(shandle, pEpSet, pMsg, pCtx); } -int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { + +int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { + return transSendRequestWithId(shandle, pEpSet, pReq, transpointId); +} + +int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { return transSendRecv(shandle, pEpSet, pMsg, pRsp); } -int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, +int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); } +int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); } -int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } +int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); } void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } -int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } +int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } +int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } // client only -int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { +int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later return transSetDefaultAddr(thandle, ip, fqdn); } // server only int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); } -void* rpcAllocHandle() { return (void*)transAllocHandle(); } +int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); } int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); } int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2e61f19af8..1fcd5fd0de 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -213,8 +213,10 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, - cliHandleUpdate}; +static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd); + +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, + NULL, cliHandleUpdate, cliHandleFreeById}; /// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, /// NULL,cliHandleUpdate}; @@ -1589,6 +1591,36 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { pThrd->cvtAddr = pCtx->cvtAddr; destroyCmsg(pMsg); } +static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code = 0; + int64_t refId = (int64_t)(pMsg->msg.info.handle); + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("%" PRId64 " already released", refId); + destroyCmsg(pMsg); + return; + } + + taosRLockLatch(&exh->latch); + SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + + if (conn->refId != refId) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except); + } + + int32_t size = transQueueSize(&conn->cliMsgs); + if (size == 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except); + return; + } else { + while (T_REF_VAL_GET(conn) >= 1) transUnrefCliHandle(conn); + } + return; +_except: + (void)transReleaseExHandle(transGetRefMgt(), refId); + destroyCmsg(pMsg); +} SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { STransConnCtx* pCtx = (*pMsg)->ctx; @@ -2759,7 +2791,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); return pThrd; } -int transReleaseCliHandle(void* handle) { +int32_t transReleaseCliHandle(void* handle) { int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { @@ -2823,25 +2855,25 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; QUEUE_INIT(&cliMsg->seqq); + *pCliMsg = cliMsg; return 0; } -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _except;); } + if (handle != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh != NULL) { @@ -2849,26 +2881,27 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (exh->handle == NULL && exh->inited != 0) { SCliMsg* pCliMsg = NULL; code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); - ASSERT(code == 0); + if (code != 0) { + taosWUnLockLatch(&exh->latch); + (void)transReleaseExHandle(transGetRefMgt(), handle); + TAOS_CHECK_GOTO(code, NULL, _except); + } QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; + } else { + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + (void)transReleaseExHandle(transGetRefMgt(), handle); } - exh->inited = 1; - taosWUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), handle); } } SCliMsg* pCliMsg = NULL; - code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); - if (code != 0) { - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return code; - } + TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _except); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -2880,13 +2913,62 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran } (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; + +_except: + transFreeMsg(pReq->pCont); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; +} +int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { + if (transpointId == NULL) { + return TSDB_CODE_INVALID_PARA; + } + int32_t code = 0; + + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _except); + } + + TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _except); + + SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _except); + } + + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), *transpointId); + if (exh == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _except); + } + + pReq->info.handle = (void*)(*transpointId); + + SCliMsg* pCliMsg = NULL; + TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _except); + + STraceId* trace = &pReq->info.traceId; + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { + destroyCmsg(pCliMsg); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + } + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + +_except: + transFreeMsg(pReq->pCont); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { +int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; @@ -2908,8 +2990,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs code = tsem_init(sem, 0, 0); if (code != 0) { taosMemoryFree(sem); - code = TAOS_SYSTEM_ERROR(errno); - TAOS_CHECK_GOTO(code, NULL, _RETURN1); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1); } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -3003,13 +3084,13 @@ _EXIT: taosMemoryFree(pSyncMsg); return code; } -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, - int32_t timeoutMs) { +int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs) { int32_t code = 0; STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); @@ -3096,12 +3177,11 @@ _RETURN2: /* * **/ -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { +int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } - SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); @@ -3145,7 +3225,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { return code; } -int64_t transAllocHandle() { +int32_t transAllocHandle(int64_t* refId) { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); if (exh == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3166,5 +3246,39 @@ int64_t transAllocHandle() { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); - return exh->refId; + *refId = exh->refId; + return 0; +} +int32_t transFreeConnById(void* shandle, int64_t transpointId) { + int32_t code = 0; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + return TSDB_CODE_RPC_MODULE_QUIT; + } + if (transpointId == 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except); + } + + SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except); + } + + SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (pCli == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _except); + } + pCli->type = FreeById; + + STransMsg msg = {.info.handle = (void*)transpointId}; + + code = transAsyncSend(pThrd->asyncPool, &pCli->q); + if (code != 0) { + taosMemoryFree(pCli); + TAOS_CHECK_GOTO(code, NULL, _except); + } + +_except: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 9df0ddb6f3..148f4d4e9a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -234,7 +234,7 @@ bool transReadComplete(SConnBuffer* connBuf) { return (p->left == 0 || p->invalid) ? true : false; } -int transSetConnOption(uv_tcp_t* stream, int keepalive) { +int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) { #if defined(WINDOWS) || defined(DARWIN) #else return uv_tcp_keepalive(stream, 1, keepalive); @@ -745,8 +745,7 @@ int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) { return taosRemoveRef(refMgt, refId); } -void* transAcquireExHandle(int32_t refMgt, int64_t refId) { - // acquire extern handle +void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle return (void*)taosAcquireRef(refMgt, refId); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1e0d54eb5b..0202fbc599 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1707,7 +1707,7 @@ void transUnrefSrvHandle(void* handle) { } } -int transReleaseSrvHandle(void* handle) { +int32_t transReleaseSrvHandle(void* handle) { int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; @@ -1747,7 +1747,7 @@ _return2: return code; } -int transSendResponse(const STransMsg* msg) { +int32_t transSendResponse(const STransMsg* msg) { int32_t code = 0; if (msg->info.noResp) { @@ -1800,7 +1800,7 @@ _return2: rpcFreeCont(msg->pCont); return code; } -int transRegisterMsg(const STransMsg* msg) { +int32_t transRegisterMsg(const STransMsg* msg) { int32_t code = 0; SExHandle* exh = msg->info.handle; @@ -1891,4 +1891,4 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { return code; } -int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } +int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }