diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 651b379851..c6f7b5cf2e 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -199,6 +199,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); +void destroyAhandle(void* ahandle); + int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* ctx); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 53edcd4fee..8a0cccc71f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -72,6 +72,7 @@ typedef struct SRpcMsg { typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); +typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; @@ -97,6 +98,9 @@ typedef struct SRpcInit { // set up timeout for particular msg RpcTfp tfp; + // destroy client ahandle; + RpcDfp dfp; + void *parent; } SRpcInit; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bb25a595af..bb93e4d934 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -140,12 +140,13 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; - // rpcInit.tfp = clientRpcTfp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; + rpcInit.dfp = destroyAhandle; + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index cf064881c2..6eadf80e3d 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -146,6 +146,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } taosMemoryFreeClear(pMsgBody); } +void destroyAhandle(void *ahandle) { + SMsgSendInfo *pSendInfo = ahandle; + if (pSendInfo == NULL) return; + + destroySendMsgInfo(pSendInfo); +} int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* rpcCtx) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index b9167501e2..3b7182f983 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -53,6 +53,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); + void (*destroyFp)(void* ahandle); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index d3db4879d1..feb55985de 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -53,6 +53,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; + pRpc->destroyFp = pInit->dfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 21444018cd..42dc65abea 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -143,6 +143,7 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); +static void cliReleaseUnfinishedMsg(SCliConn* conn); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); @@ -163,17 +164,6 @@ static void destroyThrdObj(SCliThrd* pThrd); static void cliWalkCb(uv_handle_t* handle, void* arg); -static void cliReleaseUnfinishedMsg(SCliConn* conn) { - for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { - SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); - if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { - if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { - conn->ctx.freeFunc(msg->ctx->ahandle); - } - } - destroyCmsg(msg); - } -} #define CLI_RELEASE_UV(loop) \ do { \ uv_walk(loop, cliWalkCb, NULL); \ @@ -266,6 +256,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { static void* cliWorkThread(void* arg); +static void cliReleaseUnfinishedMsg(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { + SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); + if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { + if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { + conn->ctx.freeFunc(msg->ctx->ahandle); + continue; + } + if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { + tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); + pTransInst->destroyFp(msg->ctx->ahandle); + } + } + destroyCmsg(msg); + } +} bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL;