From 3cc9979a9979ced5d9816fc6998e0d6b308a5ed2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 09:17:56 +0800 Subject: [PATCH 1/6] handle except --- include/libs/transport/trpc.h | 13 +-- source/client/src/clientEnv.c | 1 - source/libs/qcom/src/queryUtil.c | 4 + source/libs/transport/inc/transComm.h | 12 ++- source/libs/transport/src/trans.c | 2 - source/libs/transport/src/transCli.c | 75 ++++++++------ source/libs/transport/src/transSrv.c | 135 ++++++++++++++++++-------- source/libs/transport/test/transUT.cc | 23 +---- 8 files changed, 163 insertions(+), 102 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6bc28e3ea0..d8dcf72bed 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -42,9 +42,10 @@ typedef struct SRpcMsg { void * pCont; int contLen; int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client - int noResp; // has response or not(default 0 indicate resp); + void * handle; // rpc handle returned to app + void * ahandle; // app handle set by client + int noResp; // has response or not(default 0 indicate resp); + int persistHandle; // persist handle or not } SRpcMsg; @@ -69,15 +70,9 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // call back to keep conn or not - bool (*pfp)(void *parent, tmsg_t msgType); - // to support Send messages multiple times on a link void *(*mfp)(void *parent, tmsg_t msgType); - // call back to handle except when query/fetch in progress - bool (*efp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 525c5f9fb8..2e9a707dd3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.label = "TSC"; rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; - rpcInit.pfp = persistConnForSpecificMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 63fbf59c06..b39d3e6e37 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp .ahandle = (void*)pInfo, .handle = pInfo->msgInfo.handle, .code = 0}; + if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || + pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { + rpcMsg.persistHandle = 1; + } assert(pInfo->fp != NULL); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8ea65b193d..a60531a429 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -150,11 +150,12 @@ typedef struct { typedef struct { char version : 4; // RPC version - char comp : 4; // compression algorithm, 0:no compression 1:lz4 - char resflag : 2; // reserved bits - char spi : 1; // security parameter index + char comp : 2; // compression algorithm, 0:no compression 1:lz4 + char noResp : 2; // noResp bits, 0: resp, 1: resp + char persist : 2; // persist handle,0: no persit, 1: persist handle + char release : 2; char secured : 2; - char encrypt : 3; // encrypt algorithm, 0: no encryption + char spi : 2; uint32_t code; // del later uint32_t msgType; @@ -179,6 +180,9 @@ typedef struct { #pragma pack(pop) +typedef enum { Normal, Quit, Release } STransMsgType; +typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus; + #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 2cab03f133..a688e9981e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,9 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->pfp = pInit->pfp; pRpc->mfp = pInit->mfp; - pRpc->efp = pInit->efp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 31097a591f..7100c34845 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,11 +17,6 @@ #include "transComm.h" -// Normal(default): send/recv msg -// Quit: quit rpc inst -// Release: release handle to rpc inst -typedef enum { Normal, Quit, Release } SCliMsgType; - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -36,7 +31,8 @@ typedef struct SCliConn { int hThrdIdx; bool broken; // link broken or not - int persist; // + ConnStatus status; // + int release; // 1: release // spi configure char spi; char secured; @@ -55,7 +51,7 @@ typedef struct SCliMsg { STransMsg msg; queue q; uint64_t st; - SCliMsgType type; + STransMsgType type; } SCliMsg; typedef struct SCliThrdObj { @@ -113,10 +109,12 @@ static void cliSend(SCliConn* pConn); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); + // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -133,6 +131,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + if (T_REF_VAL_GET(conn) == 1) { \ + SCliThrdObj* thrd = conn->hostThrd; \ + addConnToPool(thrd->pool, conn); \ + } \ + goto _RETURN; \ + } \ + } while (0) + #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -151,14 +163,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ - if (conn->persist == false) { \ - conn->persist = true; \ + if (conn->status == ConnNormal) { \ + conn->status = ConnAcquire; \ transRefCliHandle(conn); \ } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) +#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) static void* cliWorkThread(void* arg); @@ -177,7 +190,6 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -185,6 +197,8 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.ahandle = NULL; + CONN_SHOULD_RELEASE(conn, pHead); + SCliMsg* pMsg = NULL; if (taosArrayGetSize(conn->cliMsgs) > 0) { pMsg = taosArrayGetP(conn->cliMsgs, 0); @@ -200,9 +214,8 @@ void cliHandleResp(SCliConn* conn) { // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); - if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) { + if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.handle = conn; - CONN_SET_PERSIST_BY_APP(conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } @@ -241,6 +254,8 @@ void cliHandleResp(SCliConn* conn) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } +_RETURN: + return; } void cliHandleExcept(SCliConn* pConn) { @@ -367,6 +382,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->status = ConnNormal; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -423,8 +439,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; - conn->persist = false; - conn->broken = false; + conn->status = ConnNormal; + conn->broken = 0; transRefCliHandle(conn); return conn; } @@ -513,7 +529,9 @@ void cliSend(SCliConn* pConn) { msgLen += sizeof(STransUserMsg); } - pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -522,6 +540,9 @@ void cliSend(SCliConn* pConn) { TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + if (pHead->persist == 1) { + CONN_SET_PERSIST_BY_APP(pConn); + } pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); @@ -571,12 +592,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { } transDestroyBuffer(&conn->readBuf); - if (conn->persist && T_REF_VAL_GET(conn) >= 2) { - conn->persist = false; + conn->status = ConnRelease; + int ref = T_REF_VAL_GET(conn); + if (ref == 2) { transUnrefCliHandle(conn); + } else if (ref == 1) { addConnToPool(pThrd->pool, conn); - } else { - transUnrefCliHandle(conn); } } @@ -652,14 +673,10 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - - if (pMsg->type == Normal) { - cliHandleReq(pMsg, pThrd); - } else if (pMsg->type == Quit) { - cliHandleQuit(pMsg, pThrd); - } else if (pMsg->type == Release) { - cliHandleRelease(pMsg, pThrd); + if (pMsg == NULL) { + continue; } + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } if (count >= 2) { @@ -802,8 +819,8 @@ void transReleaseCliHandle(void* handle) { STransMsg tmsg = {.handle = handle}; SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); - cmsg->type = Release; cmsg->msg = tmsg; + cmsg->type = Release; transSendAsync(thrd->asyncPool, &cmsg->q); } @@ -833,6 +850,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); @@ -858,6 +876,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 2efdb109aa..321a3489b7 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,6 +35,7 @@ typedef struct SSrvConn { bool broken; // conn broken; + ConnStatus status; struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -47,18 +48,18 @@ typedef struct SSrvConn { } SSrvConn; typedef struct SSrvMsg { - SSrvConn* pConn; - STransMsg msg; - queue q; + SSrvConn* pConn; + STransMsg msg; + queue q; + STransMsgType type; } SSrvMsg; typedef struct SWorkThrdObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_os_fd_t fd; - uv_loop_t* loop; - SAsyncPool* asyncPool; - + pthread_t thread; + uv_pipe_t* pipe; + uv_os_fd_t fd; + uv_loop_t* loop; + SAsyncPool* asyncPool; queue msg; pthread_mutex_t msgMtx; @@ -113,6 +114,11 @@ static void destroySmsg(SSrvMsg* smsg); static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease}; + static void uvDestroyConn(uv_handle_t* handle); // server and worker thread @@ -217,7 +223,6 @@ static void uvHandleReq(SSrvConn* pConn) { if (pHead->secured == 1) { pHead->msgLen -= sizeof(STransUserMsg); } - // } STransMsg transMsg; @@ -230,24 +235,32 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; - - if (pHead->resflag == 0) { + if (pConn->status == ConnNormal) { + if (pHead->persist == 1) { + pConn->status = ConnAcquire; + transRefSrvHandle(pConn); + } + } + if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - transMsg.handle = pConn; tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + // no ref here + } + + if (pHead->noResp == 0) { + transMsg.handle = pConn; } STrans* pTransInst = (STrans*)p->shandle; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth - // validate msg type } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -272,7 +285,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - uvNotifyLinkBrokenToApp(conn); + // uvNotifyLinkBrokenToApp(conn); // STrans* pTransInst = conn->pTransInst; // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { @@ -301,8 +314,11 @@ void uvOnSendCb(uv_write_t* req, int status) { SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); taosArrayRemove(conn->srvMsgs, 0); + if (msg->type == Release && conn->status != ConnNormal) { + conn->status = ConnNormal; + transUnrefSrvHandle(conn); + } destroySmsg(msg); - // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); @@ -339,6 +355,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { pHead->secured = pMsg->code == 0 ? 1 : 0; // pHead->msgType = smsg->pConn->inType + 1; + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); // add more info char* msg = (char*)pHead; @@ -371,10 +388,12 @@ static void uvStartSendResp(SSrvMsg* smsg) { transUnrefSrvHandle(pConn); return; } - transUnrefSrvHandle(pConn); + if (pConn->status == ConnNormal) { + transUnrefSrvHandle(pConn); + } if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), + tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; @@ -408,6 +427,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); + } transUnrefSrvHandle(c); } } @@ -431,20 +453,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("unexcept occurred, continue"); continue; } - if (msg->pConn == NULL) { - free(msg); - bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); - if (noConn == true) { - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); - } else { - destroyAllConn(pThrd); - // uv_loop_close(pThrd->loop); - pThrd->quit = true; - } - } else { - uvStartSendResp(msg); - } + (*transAsyncHandle[msg->type])(msg, pThrd); } } static void uvAcceptAsyncCb(uv_async_t* async) { @@ -633,6 +642,7 @@ static SSrvConn* createConn(void* hThrd) { tTrace("conn %p created", pConn); pConn->broken = false; + pConn->status = ConnNormal; transRefSrvHandle(pConn); return pConn; @@ -748,7 +758,38 @@ End: transCloseServer(srv); return NULL; } - +void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { + if (QUEUE_IS_EMPTY(&thrd->conn)) { + uv_loop_close(thrd->loop); + uv_stop(thrd->loop); + } else { + destroyAllConn(thrd); + thrd->quit = true; + } + free(msg); +} +void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { + // release handle to rpc init + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + } + taosArrayPush(conn->srvMsgs, &msg); + uvStartSendRespInternal(msg); + return; + } else if (conn->status == ConnRelease) { + // already release by server app, do nothing + } else if (conn->status == ConnNormal) { + // no nothing + // user should not call this rpcRelease handle; + } + free(msg); +} +void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { + // send msg to client + uvStartSendResp(msg); +} void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; @@ -759,10 +800,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { - SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + SSrvMsg* msg = calloc(1, sizeof(SSrvMsg)); + msg->type = Quit; tDebug("server send quit msg to work thread"); - - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &msg->q); } void transCloseServer(void* arg) { @@ -813,8 +854,21 @@ void transUnrefSrvHandle(void* handle) { } void transReleaseSrvHandle(void* handle) { - // do nothing currently - // + if (handle == NULL) { + return; + } + SSrvConn* pConn = handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + STransMsg tmsg = {.handle = handle, .code = 0}; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + + tTrace("server conn %p start to release", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); } void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { @@ -826,6 +880,7 @@ void transSendResponse(const STransMsg* pMsg) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); srvMsg->pConn = pConn; srvMsg->msg = *pMsg; + srvMsg->type = Normal; tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index ec89d695a2..31015359f4 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -31,11 +31,6 @@ class Server; int port = 7000; // server process -static bool cliPersistHandle(void *parent, tmsg_t msgType) { - // client persist handle - return msgType == 2 || msgType == 4; -} - typedef struct CbArgs { tmsg_t msgType; } CbArgs; @@ -93,7 +88,6 @@ class Client { } void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.pfp = pfp; this->transCli = rpcOpen(&rpcInit_); } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { @@ -103,8 +97,6 @@ class Client { } void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - - rpcInit_.pfp = pfp; rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -149,7 +141,6 @@ class Server { rpcInit_.label = (char *)label; rpcInit_.numOfThreads = 5; rpcInit_.cfp = processReq; - rpcInit_.efp = NULL; rpcInit_.user = (char *)user; rpcInit_.secret = (char *)secret; rpcInit_.ckey = (char *)ckey; @@ -167,7 +158,6 @@ class Server { } void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { this->Stop(); - rpcInit_.efp = efp; this->Start(); } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { @@ -358,10 +348,10 @@ TEST_F(TransEnv, clientUserDefined) { } TEST_F(TransEnv, cliPersistHandle) { - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle, .noResp = 0}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -378,11 +368,9 @@ TEST_F(TransEnv, cliPersistHandle) { } TEST_F(TransEnv, cliReleaseHandle) { - tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -398,7 +386,7 @@ TEST_F(TransEnv, cliReleaseHandle) { ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { @@ -431,7 +419,7 @@ TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; @@ -450,7 +438,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) { } TEST_F(TransEnv, cliPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; From 3b147dd5bc9db5e9cd75418048dd0a5ea4b62db2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 22:00:20 +0800 Subject: [PATCH 2/6] handle except --- include/libs/transport/trpc.h | 30 +++--- source/libs/transport/inc/transComm.h | 37 +++++--- source/libs/transport/inc/transportInt.h | 3 - source/libs/transport/src/.transCli.c.swn | Bin 0 -> 49152 bytes source/libs/transport/src/trans.c | 12 ++- source/libs/transport/src/transCli.c | 17 +++- source/libs/transport/src/transComm.c | 56 ++++++++++- source/libs/transport/src/transSrv.c | 20 ++-- source/libs/transport/test/transUT.cc | 37 +------- source/libs/transport/test/transportTests.cc | 94 +++++++++++++++++++ 10 files changed, 225 insertions(+), 81 deletions(-) create mode 100644 source/libs/transport/src/.transCli.c.swn diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index d8dcf72bed..aae0c6bd22 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -70,12 +70,19 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // to support Send messages multiple times on a link - void *(*mfp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; +typedef struct { + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcCtxVal; + +typedef struct { + SHashObj *args; +} SRpcCtx; + int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); @@ -84,16 +91,17 @@ void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); - +void rpcReleaseHandle(void *handle, int8_t type); // void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a60531a429..a939bbd644 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -14,6 +14,10 @@ */ #ifdef USE_UV +#ifdef __cplusplus +extern "C" { +#endif + #include #include "lz4.h" #include "os.h" @@ -121,24 +125,21 @@ typedef struct { } SRpcReqContext; typedef SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - // int32_t code; // error code - // int16_t numOfTry; // number of try for different servers - // int8_t oldInUse; // server EP inUse passed by app - // int8_t redirect; // flag to indicate redirect + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + tmsg_t msgType; // message type int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -181,7 +182,7 @@ typedef struct { #pragma pack(pop) typedef enum { Normal, Quit, Release } STransMsgType; -typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus; +typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) @@ -259,7 +260,7 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); @@ -270,4 +271,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void transCloseClient(void* arg); void transCloseServer(void* arg); +void transCtxInit(STransCtx* ctx); +void transCtxDestroy(STransCtx* ctx); +void transCtxClear(STransCtx* ctx); +void transCtxMerge(STransCtx* dst, STransCtx* src); +void* transCtxDumpVal(STransCtx* ctx, int32_t key); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e739380467..1395408960 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,9 +63,6 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - bool (*pfp)(void* parent, tmsg_t msgType); - void* (*mfp)(void* parent, tmsg_t msgType); - bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/.transCli.c.swn b/source/libs/transport/src/.transCli.c.swn new file mode 100644 index 0000000000000000000000000000000000000000..583fbc9f74257c1841caaac6e0c3890d57c227da GIT binary patch literal 49152 zcmeI537A}0b?2J^#w>P7SPaa7pJgPfrPk^d%aUoeT2{wWOYTLsuuW54{kpqwbycaV zS_@tP1BrvF?mYPErx zZ$7@NPk&X{d-vVt+;h)8=iGDeo8EL_WN&othRuC^9@*FTy74oo-*@}x`~J(b`}!KS zT1%IY_nAF&_MEexI&=29XFPRwxwIQ6cE;7Yay8yKS8k<0nzhA7Dc)EqPd7KV8pUd} zUTd^AHX9}PYp7D*P+BhA-n||LdK5Sw1r}Sg>(4!@?~E;*H!11qt4@m^_1lxjd#ZQ8 zM}Zy%dKBnUphtlo1$q?dQJ_bG9tD1lDA1}utnWFr=Qn(tj{5Hhw0%Fw|1SFPV{P{z z?0@g_-(P6E|8W0%wf{cN7ZA#MsJ~wI-@|S9Pw>~D;JDtnjp|6u?B>^J-RZUg@vw7^>Mt&{uuz7AdrE&^-7>ELrG z_4WM`xBii3#H0Md>Nm_-+(^> zF9X+tLGVCu8~wWnTm;nDPXa6WG+V`1xfC6)m1m+-r92*2XD(hWw+iD#WlYRBX7)`# zYi-oxkN&N+HIulomgIxYW7X2`Vs)kxUv@>mJ+!wuw>GNt|3H+>qpsZgqixYuJm(%6 ziw_N`Q1?s4W`2&vmAKf9ui2U_HA(Gnub{{-86>S-zQSFers^vydgI1u-pCU*;zQAF zqqYzg>-9V(lQs<>EyZ<5`k`92x;85DKXgwsi7iD`oUSdl>~1&WH0xHq_H62nS}ATe zqnTo>NR5g!u1d|gaX4-aM8%nzXeKVz7U~V9id&_qQf$SIe0?K*sI+(`Q%YThQQIt& zgrqy;W~)&l=DgSgm;Ho}JbZk6hR(zQlZt+m!} zOmh|^dQ$KQ?QX?M#>+A}tuz)NnpV||;`XjJC#^DdNlW|{ms(0p#}7p1YAX_97Mrd= zOT|iMx>&k0D$YV*>&hr<)gspuIU0IOyL^u{%B^^%xmse^*UGw9x-xFHXFV2|4!f-B zp}4r9yeV-YYBtNyrlhDUHep=Ws?Ame5vCVsm4>g5Epc2`oj%rzo1VEkmOEIf)JiIR z$Kvd4+;CNB=M8G%D$!h=n}dbonJHZu=u{5~S}Unq(JTzmAm{d`PLvm7#*Hhg#gB#> z;>vVwX~c77N`aenmrje~VrgFDPeTP7I$n#9*2|69jlWzW#_kLv@xerGKU`XnU_~DC zr;~@el?-e@R8sbwi(5*t3pugm>C$|$u{J8#X+7L@#uoZUadat{yP{Nw88_UNRa-Wl zQkRkeVTp}Zp|T^1M1pUxh$6XzJc~-jda+b)9g8aEg>p+0szhPd&c42Dko^4G11-1q zMbV+fc+pX7YhUYFJ)VhYBbaTm)JoGuSGmV3WhNXwl$|@EAET5m@b^^1^iy42*f;C$ zH}zy7j!RLl&csJ`S>#I}suZW=%4HMdJ9kX&8Qd|vXKK&z=oKBRXZ1-d=Y@sxGZ~}| z4>51)LPGxfgkl^=7%yA7y>TP=)MIC#snHZ4ZFxTACS=nEV>c+ZWMVs4OxinC=J1r~engHKUD=GAoD&1AjZDtz zPu(DOrBp@$}$4+V*+j<2mZH`0+Gm-PnLPy75+bz zd){64gc!G6#dt%lqYN=oZ3(+o$`qScNU70lRG9IjWF#Elm}2N7PV}b=>~J^Sg8Vb_ z^x~Y2ib`z`_M5FK$J`)QCT{#jFeo(izmfN)#YQ|6?nljfS$~r&e`!q3mgh*or3}+I zoNDBHcJp}%J5!ZNr1p&4G((*=+jtJ-mM2f^XvD{eli^`qn{L#ujH@owO1XNazYw|U zv@chKD4JL2j?5TmCfu!ToUjm&t06h2r8!Vryw+$GkHJs+Fxk*0HgdD9Ytxn`R-4=; z5!vLz-@2YlJYQ?JR3Y7fVf`#g?)DeTn!l0GaqcgFSrxO{+N79IJ~1^mylZOb@X(&Y zvEf30Ctgz3#7w7`tEI}~OdPGU@aCb~!or66Rk}VaVK+HGJT-ZMK3|J0V0cw()w$?M zxuuz<+>B7l<{HI?4bjM~6c;AFIF8PrZ?)>@ZQOX|$dL_m)x{09#vB3ErMTLRn;W-p zP(BB1i&1mFwpf{A5<47g&Wp>3B*u==tByErOPmb;1H$H4-4vrp-E*Tsf8yuZD zxYgRA86iF#SFNn_LcLO^21knYaFywSI_w=D8`@3!!5t%eMkWrzPbzI9)A9%+%rIFr z%F{@_8O^(@_F(DwSwDjuxBo>>_h1DWgk9CU)wdPR}zoh_BK8nCXbH!8~ zh)|)MWE7pzseQ95w&{yKFFHowm5Nn0XMm1NTTi8{GaG5kLT#ozd(5|Oi2iJp=jK~c zq0}F3*|hno(Zt}s@n|P{{6I8PEp3R-f7X1fSbG|1O1s~H7Qel3_a0t{u6+9Gt1-tx>;3c2}=D{3T4;}%&hmHOja4UE{I0k0G zFxUwm4erBc|2DV{ya~J!Tn^TOlfh4+@i)Qez@6Z;Ky?2&=zV$==ux0YfnSOOr{i)d z&tfW}=V(SL)W_om3^kPKX1p-v?Y_3FM~e9K@(EGqbpdZ&9c6j_cKxtiWVy(%W7a?c z-F{~M_QV_*h}IOGy0(7%p?)hiYo;-2HfG4OU=ABO4*OG6X#I9PG~U*xJd`zO&s}B) z8m1R*2zg6uC!f(;W1(m^NVtIqVq2sc?!wrtD0JL~`ed_URSAhO2S~NH?$x+(8I_mI zYOWfpdZTuluR|8WHfCcv5DJ@}&!IMJjo_Br8R%QCijoP%2BJ;;v%>wwv1VO`@TEf1 zl%4Xo9&0%=x|BhuCUs)A`aCf7l`&sw#$}a-)-qK43X*QVsN>hymQ8<2tKYZeiqy}mOm^(QIM_eLbI+`*vL!@`860tG*@i#D;h=E zwZ&F#0ox<7DvM2go{Xxb^VV;dIp!F6o2j`|#tB1vMutZx+-P*80#CYQ&$T5ts@>)P z3&RL1EI*7a!LMK#A;@>iolscSgh{asFrblo*gyzZB<6XmXQI`(0B4%5^P<&Fey);p znISje7hU8a^nSr8azhfTW0-kYKI4qRXDo`HYL0#?U(kvTav6&j1JwPX8(MqfDpG&8 z5?A|^L29l?U1FGR5_9}N8u;ltK){Vw$YqI^+4YRAtF+$`H*M_0!%hE_KvxSVTxb)C^P8`*$K>N_&Z>|ze!r6u4$ z2u1OrD+$N5l@uk;p$^ZbLn2R0mz$~?sF_k`l4X|J<>@jxiXUlbp<7)>Mu$4qMfU#_ zQ95r`AUpPd(JP>z#K!*wcr|zxI1C2C8Q^4a7dHK!;CI0|co29scKp-9W57RPx8DGE zfd_yOV5c7evfF=vEv~ir5|{)NU=8?J?C`$>Hv`$_PXk-PW^gk29(MR0;4i>+U<&L9 z`@mDc&#}Qj2i^gG4`^-uVsH<(_ub%w;5p!0a0xg7WSifI{rx@g1@I|wD|jFHD{vFI z5}XgV0 zgU5ic(GMR4>W>eAiX@P4Y*=_iGZJ8oPA0&T0o~SR&>if zCx@6VTv;xNfDI<$lTnsj&S-?1QZMkjYq)%#v8*w66jz$D>1)1j$~B}zMp<=RM|UnR z)DILZp|(uW8nHA*$q_@E-;>cRaURA+v}R3onhns=eN+2~$Hqq(qB{;w4esAxP|N*1 z)aLE?ihyNA<;f2~oXXqL!3)!dm zsRyBCA*~A+t=ki2Iw_V%uiC^+1o2gt)!gMc->OwB9rEz$d~0&4bS+46*hd#KiR6*xWC&%dS18milj&JC(77BWg4_gu zU~tbAdW}o$3qV7Oj?9-Uaa7pUZ#2I~gkSb^d7ry&BWf%=(_1yPN@t&tPFK^qS=9zh zEHpP_m(-=rB4#N?wS$7yR@GaVC7?HJp4(eSA6PHlA95f z$I=O*a~7@-bx(cXRUBnUK&OmCO-%A=?Njo+>%>0K?1(q7J)>ME%ju%+4`zRdO!5-T zWLvOzv&lRE?J^NX3RYL?!4~{r!dZgbt_S6TcvD=YXk_?v*Sf~t52@I>3jS)_nCYmy zS&7T#(w2!#=!{8fY(`5HU#9+KDrTOwxLl{TFo^}-LX&8skM#4)!kp4sG_iX#UR5vm z>2K5U7S-t68iep4^N#OrJS~gKH}AuEEOXc{}vu25$06Oylqi!`dQ(T)K<^otUyW0hhAl1w@e`jM}YqI;MJG9L4wd1g8^O`gqn691`I{8N>7 zt-=cI8YRg7e=BzFSFnR+|L^h!=jXBUZv{_j1r*=^SRlVZAGi~rz{kMff{%b#fulfc{Y|hM+>L+WHQ+3; z85F>iz<&n+3H%uUz(>G!;F(|y>;bz$1n$8B{0AQbw}3hDB=7+6Q~3nIP2ibe3Jih^z;^Iha1XwK z+rW!~d<2`pJ@^5>2yOwt4-SC!;L+eG_yoQQ?gBdF;0~aV*xb3s(r*QA#lgIfGdOl+p zFs-@Vb`DMq7Ut7nS6fzbHlH4`fG`XFnol0t&EigBP2D;EeJ#9(eVWU3WPECP@BWE{ zX)0F7GI+a)UeVi-+RoF0wp91|@&Pb}AojY70Pw-b9n zgbv;kbAwo-#AD+8AddS|3a13dLyI(Ip%dbInF$G#IV&?Uy22r^I@&W#R?5n|;xC6G zW2>!o!UQwxw+X}JHY?SS1@A^k!7463VDIuMtT8Gm<79<6xsqhmp;-lO8t7-yCTve& zYel^iYvh~LHkR}2W3%=C+y&-a#wH6jp+*DF#T-U@hicvL3P~u`hu}Xx8{NUJ5F_LH zOL#D(Njy89617m#X6u^l*Cr3TYn_`GT9)_@gKN)~mfTKVV|IxtIq^gWZtUucbgH&< z%{h6poHNofpX5-V>d>JE!8wF!y}+}6M`b!7#P({#Q`2Do>g28y} zEekIi+~|Sm>$f}Cg$xsU?V_u%b|P-6x7$7MCb|`oh%9XZ%(O#a>yI`@ zTY|S;Z99#u%gw1`sYL{{73AitB_;K1NMF|Vvc@KgoSk{F=O0|ADSEUy) zj_pEcxyR_@C9Yc4)vR7y9(CiPYjYD{>Hf`x9qO~9c`&JoI?G8+KR+;fJ!QrfPM2z-2@j-QX4^VC z35q29lwxk9r?vuUXQueJRjwi}nu-c>TCWt-@?5n>6nnX~%H+(F>M>SUz*{OG{&rhY zq*U%^BqSG%pCk9dli!-EwrWN1clLW^HZ;rCDeZ_sH{$Mmv%_tRNy)4Ms!3rn$mc$> zcl<&e0=xEQ)_QE6P$4e)f>l_zQj>eGNP416JOJSV$({*Q-7TPIz;^yrC~~&uV{8U& zi^`@zq!{qF+r0e=Q=051epupRt1_zw2} zSHTy-=fKCn?cgTR1PyQn_&I)npMVzu#TQJ2XMnT8H?aHv5quWB5Ii3!#$Xx@gPq{f zK=B3d1pf_O4-SG$!BfB*@B{n-?*MNHF9S~pXMio>U-1dN7c{^ja4FaZ6kBitct8Gt z_kmY{=Yq??CEx&94W10XkAL9rK-l9y&ENOo7q}UW0>u(+0*?SEf`3Z<172kbTb?$^ z($i=j^6cUyzR80Q3p`FM2yEp=fM1d`=W3$q+GtG&q%XFsDCltC24-S3k>83&zRxHb z-Z_EOescw5pIt{LPBAx^4E!Xv?zraX00R-_ffZwRYaW?~;Qp5d3uhP1O^P^?$1|S7 z3(YyV$W1(~ZR+iJ>)3^`f3V-BFlwdGWQ+*29`sv%*WPbk)?Lj&#`^G% z3$9C_aO~vkzb1j1!?%wgKEq^gZjX3=^xJE?8O_WTOyWR_%241-i=F+~o(Y=L;`j zG8|x!U>%uhN!zNMnYTKj64X(|}lZjoKvL!@XQy(%j z*M?j63BQjuRS6(PQ1#J&{Z8<{E^awbg4E7bLVJt`=1ufaeyME>gQ&#Fceg1#YfA4& zT_vBy2N!HEA9bBgx{SxJkDbL9jBimJVZq#Th7!TtRLn>wRK(p5X6ADCm9OH;4Xj{` zXLai?x_woST7p9qNlo~Kl(*xGbYmnjC@QwIp+_yk#L+TmU3IES@|3r{1D(`Mt>H$a z){tj8v4~`W%od3!u|PeWy`^@pq@s2ZM%-vjRf`L80n=6Ii&*V*?!zvmG5x?r3m#*~ zZqqhPw}|1wA-s~(6EeAir6id90d7eb2CVTgCG-!zXC-eta>)6g{kH9;N2b6;nr&Qn zt3M@v-TdSJ55J)BahL6R)O~Vpau)nLEY170o)@|s+ZLSVJT*=&r;fqH6E26E+AZ__ zxJe|LVQhggGrPbo!J+Q`ZOB4aB;1Z8S56443e)r&H(`>{$4R=rKx`&bEy>hk8naod znc2H05tJa~eG7Z{by+e$D_qo`a4d#vr4L&3Js#rAa$_oG73C#jiBPt=%`Pa@ZVc>> zIf0kvQCd#~~hfi}*A)rISK z`ofTliKojy5oq8Th6JSwyQtcnVvg_`k?Au`2BjTU<9IK(%;&?0J#D)a*Dr4^S<>nx zJ@eFU8{fWSL-cNDcF-f)gPcO6{TSD6>v{D}hk!Zitt6>8)j7=FwIGi&A$`ir-3&7= z9b~h#?)XKr-8gG|rD3T&d}Y!dB4RJ;ODS=zgWw!+68LBA`?rBNfSbV)a4v|z$Fc1{3T^|pfZzkr zIRN9}OmHVQ{%65E!E=H38jOQ&;Bnv-pfmm65B>-|4_pPF4K4=#;IZK0;JeuR9|j)+ zuLO%=9E^cez=_}$*#0+xMX(J#47?JXe+$?E27vnYKJNp7V(p){!Du@Y-E3>ydNU_r zdC8kd9kH7Dl7msQqXM2@hT&u^*hyh9pbmHn2kNk1AamXPfZI12K}?lpgnL5SkzwI-RzFZQgK*pPTWSQP3%GBs1a?RvZL6T{-lSg zFkfz;9${>i>t~vKGuRu-v_lyzz?x4&V|Cd>gHmyYG8v*PUz_(_c9NL6WByE5T7sNf zOd>N`R8jx9(ZNqxJM3V(!1{h?Ku8x9v*qezbV{|pu&oZ(NN17do>#(-Dw(6KZ+}KN zOI4Y@abqqHRc8!{NG$iVm*2L`(uzLi?lKCrR!3T*a6$cOsxJg;kFd#Z2yg^Pog5;e zjS=}Nn4erZ&YbmTGKQ*Iw>ln81-fZ0y{0m|*uR0vtnzJgLqkHHXG?vw%eKd^iuR9< zES`O4v}#f2R2oOQq|^{9$?&RJ7bizr#T9DF7+LR`+?`66-nP*LH_&=5Y$9&Mi3Gtc zcG@mtGY$R|!@3A5HD8fPo?Z;htfiy=5kVXi1y*Wnx)RBToTHY%fVKu3bm`<|G?BA< zR<0AB3Kw(_w~*;r#-yP!k2zkQ1UPO#(z5K*-fY@ak z7;C{pevQRgK+m_|IL;_=0UsGdpi&N_LKi8;Sz*iey`usXiD>SF&jZ6|qR+(^j(Do5 z)|Ifo{YeU55VXZ37>UA|_77}vbAg))) zaAG@j0JUx(^_FkA&#R?Fy$1C-Zv4VEaO%4$ zxS@DSjc!%Yb)&Bv^vEv*C&rvoEaB%OZLA$`>_>e|KDi8`uhGScKv&S&i;EP zh`~W{4mbt;7#m+_0DcvG2)qW|0A2(x1KJ;O23QY%hTVTR_&)eY@CNXD@LKR1p#1`) zU>$f6cp&%*w*S509`HhNDG2_7hx5050xtnKfoX6q_%gQs2Z7G|e-=0Z9uFP`v_C-S z|9usF8R&ffw*bWgTnnBJfnv3~mAM051U7 zf#(68S+E;C4*VG2_Q?10XXcO-!h$6B4S zf!az-$eUV3xElPCS{4&*YAJJ;TUYcq*R6}T%a}?pXWNkIcyL*kZ5wGgPhBsVT0dBRM(NIQgR{=%q{64 z*vhfo@?)YlBz9FxW++wW7;eo(j=Ne%GaMBz$)#hLMPY2_tgfHcQLNoNy>=>@-+2#M zAd_Vube(W&pQ`7aw^pLuw-}F&uDV#ZOe1$zBQr;yE?8dSY;JdexF0S|B(6wk&2i$n zgwLv#G1}d2sKpw2X7@lhrLuiicU(!UvLYZ$wYG7q&R3Cn+gG_dx#F}#v5Bvllt#G3 zFDRwgWrT`*sJDBmgHh&8T9~FW+6G6w<)@4jqA&HQ2Wr`axvUaeQ>JywkQKb7Pn=)= zEU)F-jBIH_d@zM8$r4|_>lxP>NN`}m@m-&NrhT7fet_)tEF=PO#0oa&fK zkl8dx3Yq)(1SW!0ZZF4#5|L!fm-(Fu6X~oOHWOIyxhcvswfE4Gv1^dO?2aD2f0_i1 zYU>?x%?&SH?rxx4y6jY*tT$#ymMG=(%%&yR>`$ff0cDo0k6ZR%m=KtC1-D_`@(XU; z&U7a;RUxAfbrcCw-pI{qeKuaSZpwpFlR}8&(wCaR8TPglUpaAAwke#oZs*R-vd-iF z^8BDNYlGGg2{%%yE26Tom`rm{I!_*$8k-#Dg#TTGlY4ZqBx^P>j7@=~3aD|X`HTP8 z%a`F|?6ZV-1FF6E`b{&;On%)D$O+YTV>UQ;DNrv3us1bL7hAl6Mgrf5o|ibWUN3zK zUR!=i_w$ZCKlqhq-8v2{*4~HvwA#{*=S8)ZVt`*l!AM-w}L7Pw@^1ci7I7t-#z)G+P{$B82q# zr8D7Tl}Yo;!Wv$kw7^v2Ckj_n+xqP999j@9*7#!cvuURstlH5Ld$|M(s_UxIaX5}# zU?_j+;f<~q(m;8x+Cp=sZ3+2AOHaZuy>#R>*M>xpyVtg`vw4B_72Sgb&K3)CC$j%B zP5Zv+*Z-^D{{J{O{vAN?08s4z;C|51b+a|0rx_PJHfkw-p|(r zhrpx2kFdo*2Hpp52G;=D>}$ajz`tW}-wnPG?f|mYwZ?x0On?Uit?7RW90qkT1D*+V zF2D#l3w#lKdk$OzE&>;VGr?!DvEK)DX5gQKSAds;X`uCfoeiKq{6f0cZ(}s$+%#1& zbq+027PX@9*MebJY5E(@v4ly*j|I;-o-3tYA-MP-mg*?ml|sTjqwD#Ha;vXP**KBX ziK^lxllKRzOdI&2ePPF>cV26#bXhl7o7~F&GM+)ppBaW?@&S~b!D+|eKmmz6`-xA$ zD>tsPsoTT+fO?{^rJt9F@PVlAnFV`Q$8@ao!I54F@R=F`f!c70rClyzs_>c*AA*1v zVtC)K4epI7>`73d>fs`ju{ocI2QM5P8P##g!Z!aZJHCQzT5J6dTB%WU79B6lmMxE z=UwWaF4Fo;*e)eZm_ovY))wt*y~LjNR+)Fv@E$QbWIE zQ-d5gjp<;$3@^bnXnGx7{Z<{l^so}Q;uRMw0+~WHMrC!9vYsx~i5f^EFZBZ}9|t|# zBaCXA&Qi)|j)jC5UAzH$CVw#G^2dDYK@FrK652|-8n#5%hsir-QWch+53=S)yCXZJ zv6vUgwzJCmOhk&85GbKiNc1{0(a!pA$;$z~^<*Tn{j#ooMj^ev$DLg2CSk7*w8!2t zp4=3w#u@>BHDqdQsMSuJcU=vg58n|j4Uck)H%(stRfN9Pu37sh$9H4+@ctoAHc%T- zF7)Mon1~CjR7#ULoYdL0-L8fBY-e@w1|)0^y$;Ofqn0EKvZKlS60!%NST4>S(y=F) zM7aY{Y(@FX_!#YT765;Hd%|j(IRxgHx2e>JjKOMC)>N>%;9_^^RIs#syio|$BsL>W zYyu~YW9iAFgQ6c;Z*l?jc0pbty;q6Z08V52swdjBdchG-x9Rc?+_k=FL4m4v?LY$U zT}kPIW$F$-4IwpdcUf3Vqbjfixy2PRbK1AshB{jBEu==5 zZQgS370}AgNX>FEEgymi$A90X=gr_P;2ANRW7UyPv9GDXQI#p%rOhh$D%mxW#w5Ar z)cY>VeP&n~9kR1Xa@w%Om#<)Y0#G`hOwv-Sdp6sW_S{jfI}C)noP`6TZqhjk`!ikO zLC}!dzeu=k=ah-dVvTZlGJ>P#^uzV8q&6hlqv5TqVFK{ANta792^kzp3cW)OG-$ij zz-X5oXhbtrQ6AL@aD!zfT%G6VZk(Lq;lRDiac9attbU6MNdS?v*;Pe&r^{kDTGyCe4s{6rc_9-RSWX}FT0>0dm)jZ778ffV_S>A_wOHlYk6N{0hucWHOidizKRmT#WOQ)s zAVFQ`^dE?J503AiVxx;RVCysH$d-9)+>U+D9&o0%cQ?5yGpVGUNK#u8>jTXuth%6! zdNm1#%$$nV@g$RE=3>RD(|IP|o&KwJN78dn%H5w6%`~sc{@0mvI?rC~{|CMO|0V4F z8^AT-DzF!v4desRy8lPPtHE=?ESLbJ;2iJ}@I`F?cLALXsQ3M8o&P5A$KWz>KG+J@ zfIG1LZw2E(@&D_;V^nb>L)h68I8!zRm`GHaG?@2CKkn;BIVv zt^2;2eyLqz-izF@L=#MZ2#wj>p&5VfG2|poCR-2U*}-=^bRzJIF$3X2@to!O^=5O-EtnrFW2pU-E=Ji51^F$U;kYA*ez& zfp^M8`lQYlnUQkJuk%3`!GByb-5DnaBDLx5 z|I4wHAAub$`+v1p!+(sOe=GPa@E&kI*a02{{ssG9=lxBCVQ>K`fV;5eW$P=p|ApZB z;5wjp0-gm<0C!^79|nuyEO09L7s~zw_)~BQTnZG=-w&PubjF|6Z-pj8FIkTQJqq+F z(4#<)0zC@!DA1!oj{+;9fZQ9-d7OUt-l62et9$QIO4fCQQ)-EiD;r7Y_kLzqi)~}S z?c-zTw@aHWHn$Y!({><-xy&%p@PYN-p=2xi8lWM%D-$$&KVBLUXsVprB173$6k^Ws tJ>I`JM|7;#iryWOt&qZ*oD*~fua8JYl<^MceD*y`a;0{EOM9m6{{g%nAGQDh literal 0 HcmV?d00001 diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a688e9981e..ded53ab4ea 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,7 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->mfp = pInit->mfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -119,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg); + transSendRequest(shandle, ip, port, pMsg, NULL); +} +void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); @@ -140,6 +144,10 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { + // + rpcSendResponse(msg); +} void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7100c34845..18a0611b75 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { uint64_t expireTime; int hThrdIdx; bool broken; // link broken or not + STransCtx ctx; ConnStatus status; // int release; // 1: release @@ -207,7 +208,7 @@ void cliHandleResp(SCliConn* conn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -283,7 +284,7 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -374,6 +375,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; + transCtxDestroy(&conn->ctx); tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); @@ -436,7 +438,6 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; conn->cliMsgs = taosArrayInit(2, sizeof(void*)); - QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -446,6 +447,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); @@ -455,6 +457,7 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); + transCtxDestroy(&conn->ctx); taosArrayDestroy(conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -630,10 +633,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; + transCtxMerge(&conn->ctx, &pCtx->appCtx); if (taosArrayGetSize(conn->cliMsgs) > 0) { taosArrayPush(conn->cliMsgs, &pMsg); return; } + taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); @@ -825,7 +830,7 @@ void transReleaseCliHandle(void* handle) { transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { @@ -835,13 +840,14 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } - tDebug("send request at thread:%d %p", index, pMsg); + tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; + pCtx->appCtx = *ctx; assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not @@ -855,6 +861,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } + void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7123593a33..2c90efc3aa 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) { } return false; } -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } -int transUnpackMsg(STransMsgHead* msgHead) {return 0;} +int transUnpackMsg(STransMsgHead* msgHead) { return 0; } int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); @@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) { return uv_async_send(async); } +void transCtxInit(STransCtx* ctx) { + // init transCtx + ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); +} +void transCtxDestroy(STransCtx* ctx) { + if (ctx->args == NULL) { + return; + } + + STransCtxVal* iter = taosHashIterate(ctx->args, NULL); + while (iter) { + iter->free(iter->val); + iter = taosHashIterate(ctx->args, iter); + } + taosHashCleanup(ctx->args); +} + +void transCtxMerge(STransCtx* dst, STransCtx* src) { + if (dst->args == NULL) { + dst->args = src->args; + src->args = NULL; + return; + } + void* key = NULL; + size_t klen = 0; + void* iter = taosHashIterate(src->args, NULL); + while (iter) { + STransCtxVal* sVal = (STransCtxVal*)iter; + key = taosHashGetKey(sVal, &klen); + + STransCtxVal* dVal = taosHashGet(dst->args, key, klen); + if (dVal) { + dVal->free(dVal->val); + } + taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + iter = taosHashIterate(src->args, iter); + } + taosHashCleanup(src->args); +} +void* transCtxDumpVal(STransCtx* ctx, int32_t key) { + if (ctx->args == NULL) { + return NULL; + } + STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key)); + if (cVal == NULL) { + return NULL; + } + char* ret = calloc(1, cVal->len); + memcpy(ret, (char*)cVal->val, cVal->len); + return (void*)ret; +} + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 321a3489b7..6be664233b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -403,16 +403,16 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { - STrans* pTransInst = conn->pTransInst; - if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { - STransMsg transMsg = {0}; - transMsg.msgType = conn->inType; - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - // transRefSrvHandle(conn); - (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); - } -} +// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { +// STrans* pTransInst = conn->pTransInst; +// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { +// STransMsg transMsg = {0}; +// transMsg.msgType = conn->inType; +// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; +// // transRefSrvHandle(conn); +// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); +// } +//} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 31015359f4..deccd633d8 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -86,18 +86,8 @@ class Client { rpcClose(this->transCli); this->transCli = NULL; } - void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - this->transCli = rpcOpen(&rpcInit_); - } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.mfp = mfp; - this->transCli = rpcOpen(&rpcInit_); - } - void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -156,10 +146,6 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - this->Stop(); - this->Start(); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { this->Stop(); rpcInit_.cfp = cfp; @@ -252,23 +238,11 @@ class TransObj { // srv->Stop(); } - void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPersistFP(pfp); - } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing cli->SetConstructFP(mfp); } - void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPAndMFp(pfp, mfp); - } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - //////// - srv->SetExceptFp(efp); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { /////// srv->SetSrvContinueSend(cfp); @@ -375,22 +349,15 @@ TEST_F(TransEnv, cliReleaseHandle) { req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecvNoHandle(&req, &resp); - // if (i == 5) { - // std::cout << "stop server" << std::endl; - // tr->StopSrv(); - //} - // if (i >= 6) { EXPECT_TRUE(resp.code == 0); //} } ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - // tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -459,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } TEST_F(TransEnv, queryExcept) { - tr->SetSrvExceptFp(handleExcept); + // tr->SetSrvExceptFp(handleExcept); // query and conn is broken } diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 53910aa30c..1f8c8e8ff2 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) { assert(result.size() == vals.size()); } +class TransCtxEnv : public ::testing::Test { + protected: + virtual void SetUp() { + ctx = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(ctx); + // TODO + } + virtual void TearDown() { + transCtxDestroy(ctx); + // formate + } + STransCtx *ctx; +}; + +TEST_F(TransCtxEnv, mergeTest) { + int key = 1; + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(2, taosHashGetSize(ctx->args)); + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + std::string val("Hello"); + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + { + key = 1; + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + + char *skey = (char *)transCtxDumpVal(ctx, 1); + EXPECT_EQ(0, strcmp(skey, val.c_str())); + free(skey); + + skey = (char *)transCtxDumpVal(ctx, 2); + EXPECT_EQ(0, strcmp(skey, val.c_str())); +} #endif From a9b712ab45891a1002933763f60794edd64f77ee Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 22:00:46 +0800 Subject: [PATCH 3/6] handle except --- source/libs/transport/src/.transCli.c.swn | Bin 49152 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 source/libs/transport/src/.transCli.c.swn diff --git a/source/libs/transport/src/.transCli.c.swn b/source/libs/transport/src/.transCli.c.swn deleted file mode 100644 index 583fbc9f74257c1841caaac6e0c3890d57c227da..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 49152 zcmeI537A}0b?2J^#w>P7SPaa7pJgPfrPk^d%aUoeT2{wWOYTLsuuW54{kpqwbycaV zS_@tP1BrvF?mYPErx zZ$7@NPk&X{d-vVt+;h)8=iGDeo8EL_WN&othRuC^9@*FTy74oo-*@}x`~J(b`}!KS zT1%IY_nAF&_MEexI&=29XFPRwxwIQ6cE;7Yay8yKS8k<0nzhA7Dc)EqPd7KV8pUd} zUTd^AHX9}PYp7D*P+BhA-n||LdK5Sw1r}Sg>(4!@?~E;*H!11qt4@m^_1lxjd#ZQ8 zM}Zy%dKBnUphtlo1$q?dQJ_bG9tD1lDA1}utnWFr=Qn(tj{5Hhw0%Fw|1SFPV{P{z z?0@g_-(P6E|8W0%wf{cN7ZA#MsJ~wI-@|S9Pw>~D;JDtnjp|6u?B>^J-RZUg@vw7^>Mt&{uuz7AdrE&^-7>ELrG z_4WM`xBii3#H0Md>Nm_-+(^> zF9X+tLGVCu8~wWnTm;nDPXa6WG+V`1xfC6)m1m+-r92*2XD(hWw+iD#WlYRBX7)`# zYi-oxkN&N+HIulomgIxYW7X2`Vs)kxUv@>mJ+!wuw>GNt|3H+>qpsZgqixYuJm(%6 ziw_N`Q1?s4W`2&vmAKf9ui2U_HA(Gnub{{-86>S-zQSFers^vydgI1u-pCU*;zQAF zqqYzg>-9V(lQs<>EyZ<5`k`92x;85DKXgwsi7iD`oUSdl>~1&WH0xHq_H62nS}ATe zqnTo>NR5g!u1d|gaX4-aM8%nzXeKVz7U~V9id&_qQf$SIe0?K*sI+(`Q%YThQQIt& zgrqy;W~)&l=DgSgm;Ho}JbZk6hR(zQlZt+m!} zOmh|^dQ$KQ?QX?M#>+A}tuz)NnpV||;`XjJC#^DdNlW|{ms(0p#}7p1YAX_97Mrd= zOT|iMx>&k0D$YV*>&hr<)gspuIU0IOyL^u{%B^^%xmse^*UGw9x-xFHXFV2|4!f-B zp}4r9yeV-YYBtNyrlhDUHep=Ws?Ame5vCVsm4>g5Epc2`oj%rzo1VEkmOEIf)JiIR z$Kvd4+;CNB=M8G%D$!h=n}dbonJHZu=u{5~S}Unq(JTzmAm{d`PLvm7#*Hhg#gB#> z;>vVwX~c77N`aenmrje~VrgFDPeTP7I$n#9*2|69jlWzW#_kLv@xerGKU`XnU_~DC zr;~@el?-e@R8sbwi(5*t3pugm>C$|$u{J8#X+7L@#uoZUadat{yP{Nw88_UNRa-Wl zQkRkeVTp}Zp|T^1M1pUxh$6XzJc~-jda+b)9g8aEg>p+0szhPd&c42Dko^4G11-1q zMbV+fc+pX7YhUYFJ)VhYBbaTm)JoGuSGmV3WhNXwl$|@EAET5m@b^^1^iy42*f;C$ zH}zy7j!RLl&csJ`S>#I}suZW=%4HMdJ9kX&8Qd|vXKK&z=oKBRXZ1-d=Y@sxGZ~}| z4>51)LPGxfgkl^=7%yA7y>TP=)MIC#snHZ4ZFxTACS=nEV>c+ZWMVs4OxinC=J1r~engHKUD=GAoD&1AjZDtz zPu(DOrBp@$}$4+V*+j<2mZH`0+Gm-PnLPy75+bz zd){64gc!G6#dt%lqYN=oZ3(+o$`qScNU70lRG9IjWF#Elm}2N7PV}b=>~J^Sg8Vb_ z^x~Y2ib`z`_M5FK$J`)QCT{#jFeo(izmfN)#YQ|6?nljfS$~r&e`!q3mgh*or3}+I zoNDBHcJp}%J5!ZNr1p&4G((*=+jtJ-mM2f^XvD{eli^`qn{L#ujH@owO1XNazYw|U zv@chKD4JL2j?5TmCfu!ToUjm&t06h2r8!Vryw+$GkHJs+Fxk*0HgdD9Ytxn`R-4=; z5!vLz-@2YlJYQ?JR3Y7fVf`#g?)DeTn!l0GaqcgFSrxO{+N79IJ~1^mylZOb@X(&Y zvEf30Ctgz3#7w7`tEI}~OdPGU@aCb~!or66Rk}VaVK+HGJT-ZMK3|J0V0cw()w$?M zxuuz<+>B7l<{HI?4bjM~6c;AFIF8PrZ?)>@ZQOX|$dL_m)x{09#vB3ErMTLRn;W-p zP(BB1i&1mFwpf{A5<47g&Wp>3B*u==tByErOPmb;1H$H4-4vrp-E*Tsf8yuZD zxYgRA86iF#SFNn_LcLO^21knYaFywSI_w=D8`@3!!5t%eMkWrzPbzI9)A9%+%rIFr z%F{@_8O^(@_F(DwSwDjuxBo>>_h1DWgk9CU)wdPR}zoh_BK8nCXbH!8~ zh)|)MWE7pzseQ95w&{yKFFHowm5Nn0XMm1NTTi8{GaG5kLT#ozd(5|Oi2iJp=jK~c zq0}F3*|hno(Zt}s@n|P{{6I8PEp3R-f7X1fSbG|1O1s~H7Qel3_a0t{u6+9Gt1-tx>;3c2}=D{3T4;}%&hmHOja4UE{I0k0G zFxUwm4erBc|2DV{ya~J!Tn^TOlfh4+@i)Qez@6Z;Ky?2&=zV$==ux0YfnSOOr{i)d z&tfW}=V(SL)W_om3^kPKX1p-v?Y_3FM~e9K@(EGqbpdZ&9c6j_cKxtiWVy(%W7a?c z-F{~M_QV_*h}IOGy0(7%p?)hiYo;-2HfG4OU=ABO4*OG6X#I9PG~U*xJd`zO&s}B) z8m1R*2zg6uC!f(;W1(m^NVtIqVq2sc?!wrtD0JL~`ed_URSAhO2S~NH?$x+(8I_mI zYOWfpdZTuluR|8WHfCcv5DJ@}&!IMJjo_Br8R%QCijoP%2BJ;;v%>wwv1VO`@TEf1 zl%4Xo9&0%=x|BhuCUs)A`aCf7l`&sw#$}a-)-qK43X*QVsN>hymQ8<2tKYZeiqy}mOm^(QIM_eLbI+`*vL!@`860tG*@i#D;h=E zwZ&F#0ox<7DvM2go{Xxb^VV;dIp!F6o2j`|#tB1vMutZx+-P*80#CYQ&$T5ts@>)P z3&RL1EI*7a!LMK#A;@>iolscSgh{asFrblo*gyzZB<6XmXQI`(0B4%5^P<&Fey);p znISje7hU8a^nSr8azhfTW0-kYKI4qRXDo`HYL0#?U(kvTav6&j1JwPX8(MqfDpG&8 z5?A|^L29l?U1FGR5_9}N8u;ltK){Vw$YqI^+4YRAtF+$`H*M_0!%hE_KvxSVTxb)C^P8`*$K>N_&Z>|ze!r6u4$ z2u1OrD+$N5l@uk;p$^ZbLn2R0mz$~?sF_k`l4X|J<>@jxiXUlbp<7)>Mu$4qMfU#_ zQ95r`AUpPd(JP>z#K!*wcr|zxI1C2C8Q^4a7dHK!;CI0|co29scKp-9W57RPx8DGE zfd_yOV5c7evfF=vEv~ir5|{)NU=8?J?C`$>Hv`$_PXk-PW^gk29(MR0;4i>+U<&L9 z`@mDc&#}Qj2i^gG4`^-uVsH<(_ub%w;5p!0a0xg7WSifI{rx@g1@I|wD|jFHD{vFI z5}XgV0 zgU5ic(GMR4>W>eAiX@P4Y*=_iGZJ8oPA0&T0o~SR&>if zCx@6VTv;xNfDI<$lTnsj&S-?1QZMkjYq)%#v8*w66jz$D>1)1j$~B}zMp<=RM|UnR z)DILZp|(uW8nHA*$q_@E-;>cRaURA+v}R3onhns=eN+2~$Hqq(qB{;w4esAxP|N*1 z)aLE?ihyNA<;f2~oXXqL!3)!dm zsRyBCA*~A+t=ki2Iw_V%uiC^+1o2gt)!gMc->OwB9rEz$d~0&4bS+46*hd#KiR6*xWC&%dS18milj&JC(77BWg4_gu zU~tbAdW}o$3qV7Oj?9-Uaa7pUZ#2I~gkSb^d7ry&BWf%=(_1yPN@t&tPFK^qS=9zh zEHpP_m(-=rB4#N?wS$7yR@GaVC7?HJp4(eSA6PHlA95f z$I=O*a~7@-bx(cXRUBnUK&OmCO-%A=?Njo+>%>0K?1(q7J)>ME%ju%+4`zRdO!5-T zWLvOzv&lRE?J^NX3RYL?!4~{r!dZgbt_S6TcvD=YXk_?v*Sf~t52@I>3jS)_nCYmy zS&7T#(w2!#=!{8fY(`5HU#9+KDrTOwxLl{TFo^}-LX&8skM#4)!kp4sG_iX#UR5vm z>2K5U7S-t68iep4^N#OrJS~gKH}AuEEOXc{}vu25$06Oylqi!`dQ(T)K<^otUyW0hhAl1w@e`jM}YqI;MJG9L4wd1g8^O`gqn691`I{8N>7 zt-=cI8YRg7e=BzFSFnR+|L^h!=jXBUZv{_j1r*=^SRlVZAGi~rz{kMff{%b#fulfc{Y|hM+>L+WHQ+3; z85F>iz<&n+3H%uUz(>G!;F(|y>;bz$1n$8B{0AQbw}3hDB=7+6Q~3nIP2ibe3Jih^z;^Iha1XwK z+rW!~d<2`pJ@^5>2yOwt4-SC!;L+eG_yoQQ?gBdF;0~aV*xb3s(r*QA#lgIfGdOl+p zFs-@Vb`DMq7Ut7nS6fzbHlH4`fG`XFnol0t&EigBP2D;EeJ#9(eVWU3WPECP@BWE{ zX)0F7GI+a)UeVi-+RoF0wp91|@&Pb}AojY70Pw-b9n zgbv;kbAwo-#AD+8AddS|3a13dLyI(Ip%dbInF$G#IV&?Uy22r^I@&W#R?5n|;xC6G zW2>!o!UQwxw+X}JHY?SS1@A^k!7463VDIuMtT8Gm<79<6xsqhmp;-lO8t7-yCTve& zYel^iYvh~LHkR}2W3%=C+y&-a#wH6jp+*DF#T-U@hicvL3P~u`hu}Xx8{NUJ5F_LH zOL#D(Njy89617m#X6u^l*Cr3TYn_`GT9)_@gKN)~mfTKVV|IxtIq^gWZtUucbgH&< z%{h6poHNofpX5-V>d>JE!8wF!y}+}6M`b!7#P({#Q`2Do>g28y} zEekIi+~|Sm>$f}Cg$xsU?V_u%b|P-6x7$7MCb|`oh%9XZ%(O#a>yI`@ zTY|S;Z99#u%gw1`sYL{{73AitB_;K1NMF|Vvc@KgoSk{F=O0|ADSEUy) zj_pEcxyR_@C9Yc4)vR7y9(CiPYjYD{>Hf`x9qO~9c`&JoI?G8+KR+;fJ!QrfPM2z-2@j-QX4^VC z35q29lwxk9r?vuUXQueJRjwi}nu-c>TCWt-@?5n>6nnX~%H+(F>M>SUz*{OG{&rhY zq*U%^BqSG%pCk9dli!-EwrWN1clLW^HZ;rCDeZ_sH{$Mmv%_tRNy)4Ms!3rn$mc$> zcl<&e0=xEQ)_QE6P$4e)f>l_zQj>eGNP416JOJSV$({*Q-7TPIz;^yrC~~&uV{8U& zi^`@zq!{qF+r0e=Q=051epupRt1_zw2} zSHTy-=fKCn?cgTR1PyQn_&I)npMVzu#TQJ2XMnT8H?aHv5quWB5Ii3!#$Xx@gPq{f zK=B3d1pf_O4-SG$!BfB*@B{n-?*MNHF9S~pXMio>U-1dN7c{^ja4FaZ6kBitct8Gt z_kmY{=Yq??CEx&94W10XkAL9rK-l9y&ENOo7q}UW0>u(+0*?SEf`3Z<172kbTb?$^ z($i=j^6cUyzR80Q3p`FM2yEp=fM1d`=W3$q+GtG&q%XFsDCltC24-S3k>83&zRxHb z-Z_EOescw5pIt{LPBAx^4E!Xv?zraX00R-_ffZwRYaW?~;Qp5d3uhP1O^P^?$1|S7 z3(YyV$W1(~ZR+iJ>)3^`f3V-BFlwdGWQ+*29`sv%*WPbk)?Lj&#`^G% z3$9C_aO~vkzb1j1!?%wgKEq^gZjX3=^xJE?8O_WTOyWR_%241-i=F+~o(Y=L;`j zG8|x!U>%uhN!zNMnYTKj64X(|}lZjoKvL!@XQy(%j z*M?j63BQjuRS6(PQ1#J&{Z8<{E^awbg4E7bLVJt`=1ufaeyME>gQ&#Fceg1#YfA4& zT_vBy2N!HEA9bBgx{SxJkDbL9jBimJVZq#Th7!TtRLn>wRK(p5X6ADCm9OH;4Xj{` zXLai?x_woST7p9qNlo~Kl(*xGbYmnjC@QwIp+_yk#L+TmU3IES@|3r{1D(`Mt>H$a z){tj8v4~`W%od3!u|PeWy`^@pq@s2ZM%-vjRf`L80n=6Ii&*V*?!zvmG5x?r3m#*~ zZqqhPw}|1wA-s~(6EeAir6id90d7eb2CVTgCG-!zXC-eta>)6g{kH9;N2b6;nr&Qn zt3M@v-TdSJ55J)BahL6R)O~Vpau)nLEY170o)@|s+ZLSVJT*=&r;fqH6E26E+AZ__ zxJe|LVQhggGrPbo!J+Q`ZOB4aB;1Z8S56443e)r&H(`>{$4R=rKx`&bEy>hk8naod znc2H05tJa~eG7Z{by+e$D_qo`a4d#vr4L&3Js#rAa$_oG73C#jiBPt=%`Pa@ZVc>> zIf0kvQCd#~~hfi}*A)rISK z`ofTliKojy5oq8Th6JSwyQtcnVvg_`k?Au`2BjTU<9IK(%;&?0J#D)a*Dr4^S<>nx zJ@eFU8{fWSL-cNDcF-f)gPcO6{TSD6>v{D}hk!Zitt6>8)j7=FwIGi&A$`ir-3&7= z9b~h#?)XKr-8gG|rD3T&d}Y!dB4RJ;ODS=zgWw!+68LBA`?rBNfSbV)a4v|z$Fc1{3T^|pfZzkr zIRN9}OmHVQ{%65E!E=H38jOQ&;Bnv-pfmm65B>-|4_pPF4K4=#;IZK0;JeuR9|j)+ zuLO%=9E^cez=_}$*#0+xMX(J#47?JXe+$?E27vnYKJNp7V(p){!Du@Y-E3>ydNU_r zdC8kd9kH7Dl7msQqXM2@hT&u^*hyh9pbmHn2kNk1AamXPfZI12K}?lpgnL5SkzwI-RzFZQgK*pPTWSQP3%GBs1a?RvZL6T{-lSg zFkfz;9${>i>t~vKGuRu-v_lyzz?x4&V|Cd>gHmyYG8v*PUz_(_c9NL6WByE5T7sNf zOd>N`R8jx9(ZNqxJM3V(!1{h?Ku8x9v*qezbV{|pu&oZ(NN17do>#(-Dw(6KZ+}KN zOI4Y@abqqHRc8!{NG$iVm*2L`(uzLi?lKCrR!3T*a6$cOsxJg;kFd#Z2yg^Pog5;e zjS=}Nn4erZ&YbmTGKQ*Iw>ln81-fZ0y{0m|*uR0vtnzJgLqkHHXG?vw%eKd^iuR9< zES`O4v}#f2R2oOQq|^{9$?&RJ7bizr#T9DF7+LR`+?`66-nP*LH_&=5Y$9&Mi3Gtc zcG@mtGY$R|!@3A5HD8fPo?Z;htfiy=5kVXi1y*Wnx)RBToTHY%fVKu3bm`<|G?BA< zR<0AB3Kw(_w~*;r#-yP!k2zkQ1UPO#(z5K*-fY@ak z7;C{pevQRgK+m_|IL;_=0UsGdpi&N_LKi8;Sz*iey`usXiD>SF&jZ6|qR+(^j(Do5 z)|Ifo{YeU55VXZ37>UA|_77}vbAg))) zaAG@j0JUx(^_FkA&#R?Fy$1C-Zv4VEaO%4$ zxS@DSjc!%Yb)&Bv^vEv*C&rvoEaB%OZLA$`>_>e|KDi8`uhGScKv&S&i;EP zh`~W{4mbt;7#m+_0DcvG2)qW|0A2(x1KJ;O23QY%hTVTR_&)eY@CNXD@LKR1p#1`) zU>$f6cp&%*w*S509`HhNDG2_7hx5050xtnKfoX6q_%gQs2Z7G|e-=0Z9uFP`v_C-S z|9usF8R&ffw*bWgTnnBJfnv3~mAM051U7 zf#(68S+E;C4*VG2_Q?10XXcO-!h$6B4S zf!az-$eUV3xElPCS{4&*YAJJ;TUYcq*R6}T%a}?pXWNkIcyL*kZ5wGgPhBsVT0dBRM(NIQgR{=%q{64 z*vhfo@?)YlBz9FxW++wW7;eo(j=Ne%GaMBz$)#hLMPY2_tgfHcQLNoNy>=>@-+2#M zAd_Vube(W&pQ`7aw^pLuw-}F&uDV#ZOe1$zBQr;yE?8dSY;JdexF0S|B(6wk&2i$n zgwLv#G1}d2sKpw2X7@lhrLuiicU(!UvLYZ$wYG7q&R3Cn+gG_dx#F}#v5Bvllt#G3 zFDRwgWrT`*sJDBmgHh&8T9~FW+6G6w<)@4jqA&HQ2Wr`axvUaeQ>JywkQKb7Pn=)= zEU)F-jBIH_d@zM8$r4|_>lxP>NN`}m@m-&NrhT7fet_)tEF=PO#0oa&fK zkl8dx3Yq)(1SW!0ZZF4#5|L!fm-(Fu6X~oOHWOIyxhcvswfE4Gv1^dO?2aD2f0_i1 zYU>?x%?&SH?rxx4y6jY*tT$#ymMG=(%%&yR>`$ff0cDo0k6ZR%m=KtC1-D_`@(XU; z&U7a;RUxAfbrcCw-pI{qeKuaSZpwpFlR}8&(wCaR8TPglUpaAAwke#oZs*R-vd-iF z^8BDNYlGGg2{%%yE26Tom`rm{I!_*$8k-#Dg#TTGlY4ZqBx^P>j7@=~3aD|X`HTP8 z%a`F|?6ZV-1FF6E`b{&;On%)D$O+YTV>UQ;DNrv3us1bL7hAl6Mgrf5o|ibWUN3zK zUR!=i_w$ZCKlqhq-8v2{*4~HvwA#{*=S8)ZVt`*l!AM-w}L7Pw@^1ci7I7t-#z)G+P{$B82q# zr8D7Tl}Yo;!Wv$kw7^v2Ckj_n+xqP999j@9*7#!cvuURstlH5Ld$|M(s_UxIaX5}# zU?_j+;f<~q(m;8x+Cp=sZ3+2AOHaZuy>#R>*M>xpyVtg`vw4B_72Sgb&K3)CC$j%B zP5Zv+*Z-^D{{J{O{vAN?08s4z;C|51b+a|0rx_PJHfkw-p|(r zhrpx2kFdo*2Hpp52G;=D>}$ajz`tW}-wnPG?f|mYwZ?x0On?Uit?7RW90qkT1D*+V zF2D#l3w#lKdk$OzE&>;VGr?!DvEK)DX5gQKSAds;X`uCfoeiKq{6f0cZ(}s$+%#1& zbq+027PX@9*MebJY5E(@v4ly*j|I;-o-3tYA-MP-mg*?ml|sTjqwD#Ha;vXP**KBX ziK^lxllKRzOdI&2ePPF>cV26#bXhl7o7~F&GM+)ppBaW?@&S~b!D+|eKmmz6`-xA$ zD>tsPsoTT+fO?{^rJt9F@PVlAnFV`Q$8@ao!I54F@R=F`f!c70rClyzs_>c*AA*1v zVtC)K4epI7>`73d>fs`ju{ocI2QM5P8P##g!Z!aZJHCQzT5J6dTB%WU79B6lmMxE z=UwWaF4Fo;*e)eZm_ovY))wt*y~LjNR+)Fv@E$QbWIE zQ-d5gjp<;$3@^bnXnGx7{Z<{l^so}Q;uRMw0+~WHMrC!9vYsx~i5f^EFZBZ}9|t|# zBaCXA&Qi)|j)jC5UAzH$CVw#G^2dDYK@FrK652|-8n#5%hsir-QWch+53=S)yCXZJ zv6vUgwzJCmOhk&85GbKiNc1{0(a!pA$;$z~^<*Tn{j#ooMj^ev$DLg2CSk7*w8!2t zp4=3w#u@>BHDqdQsMSuJcU=vg58n|j4Uck)H%(stRfN9Pu37sh$9H4+@ctoAHc%T- zF7)Mon1~CjR7#ULoYdL0-L8fBY-e@w1|)0^y$;Ofqn0EKvZKlS60!%NST4>S(y=F) zM7aY{Y(@FX_!#YT765;Hd%|j(IRxgHx2e>JjKOMC)>N>%;9_^^RIs#syio|$BsL>W zYyu~YW9iAFgQ6c;Z*l?jc0pbty;q6Z08V52swdjBdchG-x9Rc?+_k=FL4m4v?LY$U zT}kPIW$F$-4IwpdcUf3Vqbjfixy2PRbK1AshB{jBEu==5 zZQgS370}AgNX>FEEgymi$A90X=gr_P;2ANRW7UyPv9GDXQI#p%rOhh$D%mxW#w5Ar z)cY>VeP&n~9kR1Xa@w%Om#<)Y0#G`hOwv-Sdp6sW_S{jfI}C)noP`6TZqhjk`!ikO zLC}!dzeu=k=ah-dVvTZlGJ>P#^uzV8q&6hlqv5TqVFK{ANta792^kzp3cW)OG-$ij zz-X5oXhbtrQ6AL@aD!zfT%G6VZk(Lq;lRDiac9attbU6MNdS?v*;Pe&r^{kDTGyCe4s{6rc_9-RSWX}FT0>0dm)jZ778ffV_S>A_wOHlYk6N{0hucWHOidizKRmT#WOQ)s zAVFQ`^dE?J503AiVxx;RVCysH$d-9)+>U+D9&o0%cQ?5yGpVGUNK#u8>jTXuth%6! zdNm1#%$$nV@g$RE=3>RD(|IP|o&KwJN78dn%H5w6%`~sc{@0mvI?rC~{|CMO|0V4F z8^AT-DzF!v4desRy8lPPtHE=?ESLbJ;2iJ}@I`F?cLALXsQ3M8o&P5A$KWz>KG+J@ zfIG1LZw2E(@&D_;V^nb>L)h68I8!zRm`GHaG?@2CKkn;BIVv zt^2;2eyLqz-izF@L=#MZ2#wj>p&5VfG2|poCR-2U*}-=^bRzJIF$3X2@to!O^=5O-EtnrFW2pU-E=Ji51^F$U;kYA*ez& zfp^M8`lQYlnUQkJuk%3`!GByb-5DnaBDLx5 z|I4wHAAub$`+v1p!+(sOe=GPa@E&kI*a02{{ssG9=lxBCVQ>K`fV;5eW$P=p|ApZB z;5wjp0-gm<0C!^79|nuyEO09L7s~zw_)~BQTnZG=-w&PubjF|6Z-pj8FIkTQJqq+F z(4#<)0zC@!DA1!oj{+;9fZQ9-d7OUt-l62et9$QIO4fCQQ)-EiD;r7Y_kLzqi)~}S z?c-zTw@aHWHn$Y!({><-xy&%p@PYN-p=2xi8lWM%D-$$&KVBLUXsVprB173$6k^Ws tJ>I`JM|7;#iryWOt&qZ*oD*~fua8JYl<^MceD*y`a;0{EOM9m6{{g%nAGQDh From 916dbe798b233f792e5ae74ce6dc124ab1c1916e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 23:38:47 +0800 Subject: [PATCH 4/6] handle except --- include/libs/transport/trpc.h | 3 +- source/libs/transport/inc/transComm.h | 5 +- source/libs/transport/src/trans.c | 5 +- source/libs/transport/src/transSrv.c | 81 +++++++++++++++++++++------ 4 files changed, 69 insertions(+), 25 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index aae0c6bd22..af5afb51c5 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -38,13 +38,12 @@ typedef struct SRpcConnInfo { typedef struct SRpcMsg { tmsg_t msgType; - tmsg_t expectMsgType; void * pCont; int contLen; int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client - int noResp; // has response or not(default 0 indicate resp); + int noResp; // has response or not(default 0, 0: resp, 1: no resp); int persistHandle; // persist handle or not } SRpcMsg; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a939bbd644..c861ed350e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -181,7 +181,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release } STransMsgType; +typedef enum { Normal, Quit, Release, Register } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -262,7 +262,8 @@ void transReleaseSrvHandle(void* handle); void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); -void transSendResponse(const STransMsg* pMsg); +void transSendResponse(const STransMsg* msg); +void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index ded53ab4ea..317f80c48d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -144,10 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { - // - rpcSendResponse(msg); -} +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); } void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 6be664233b..126973f27c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -17,6 +17,12 @@ #include "transComm.h" +typedef struct { + int notifyCount; // + int init; // init or not + STransMsg msg; +} SSrvRegArg; + typedef struct SSrvConn { T_REF_DECLARE() uv_tcp_t* pTcp; @@ -33,7 +39,8 @@ typedef struct SSrvConn { void* hostThrd; SArray* srvMsgs; - bool broken; // conn broken; + SSrvRegArg regArg; + bool broken; // conn broken; ConnStatus status; struct sockaddr_in addr; @@ -117,7 +124,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/) static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease}; +static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -285,11 +294,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - // uvNotifyLinkBrokenToApp(conn); - - // STrans* pTransInst = conn->pTransInst; - // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { - //} + if (conn->status == ConnAcquire) { + if (conn->regArg.init) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + } transUnrefSrvHandle(conn); } } @@ -317,6 +328,17 @@ void uvOnSendCb(uv_write_t* req, int status) { if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); + } else if (msg->type == Register && conn->status == ConnAcquire) { + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + return; } destroySmsg(msg); // send second data, just use for push @@ -403,16 +425,6 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { -// STrans* pTransInst = conn->pTransInst; -// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { -// STransMsg transMsg = {0}; -// transMsg.msgType = conn->inType; -// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; -// // transRefSrvHandle(conn); -// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); -// } -//} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; @@ -641,6 +653,7 @@ static SSrvConn* createConn(void* hThrd) { pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; @@ -774,6 +787,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { if (conn->status == ConnAcquire) { if (taosArrayGetSize(conn->srvMsgs) > 0) { taosArrayPush(conn->srvMsgs, &msg); + return; } taosArrayPush(conn->srvMsgs, &msg); uvStartSendRespInternal(msg); @@ -790,6 +804,25 @@ void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { // send msg to client uvStartSendResp(msg); } +void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + return; + } + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + } +} void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; @@ -884,6 +917,20 @@ void transSendResponse(const STransMsg* pMsg) { tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } +void transRegisterMsg(const STransMsg* msg) { + if (msg->handle == NULL) { + return; + } + SSrvConn* pConn = msg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->pConn = pConn; + srvMsg->msg = *msg; + srvMsg->type = Register; + tTrace("server conn %p start to send resp", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); +} int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { SSrvConn* pConn = thandle; struct sockaddr_in addr = pConn->addr; From 538a02170c78ff68add22457ab006e9f0eb4ac48 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 19 Mar 2022 14:10:35 +0800 Subject: [PATCH 5/6] merge 3.0 --- source/libs/transport/src/transCli.c | 27 ++++++++++++--------------- source/libs/transport/src/transSrv.c | 23 ++++++++++++++++++----- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 86533fddbe..126b6cb91f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -173,6 +173,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) +#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) static void* cliWorkThread(void* arg); @@ -509,7 +510,10 @@ void cliSend(SCliConn* pConn) { STrans* pTransInst = pThrd->pTransInst; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -537,6 +541,7 @@ void cliSend(SCliConn* pConn) { pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, @@ -546,6 +551,7 @@ void cliSend(SCliConn* pConn) { if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); } + pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); @@ -586,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; - tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); + tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - while (taosArrayGetSize(conn->cliMsgs) > 0) { - SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0); - destroyCmsg(pMsg); - taosArrayRemove(conn->cliMsgs, 0); - } - - transDestroyBuffer(&conn->readBuf); - conn->status = ConnRelease; - int ref = T_REF_VAL_GET(conn); - if (ref == 2) { - transUnrefCliHandle(conn); - } else if (ref == 1) { - addConnToPool(pThrd->pool, conn); + taosArrayPush(conn->cliMsgs, &pMsg); + if (taosArrayGetSize(conn->cliMsgs) >= 2) { + return; // send one by one } + cliSend(conn); } SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 79d1ab85a8..108b12542c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,6 +93,15 @@ typedef struct SServerObj { static const char* notify = "a"; +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + goto _RETURE; \ + } \ + } while (0) // refactor later static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); @@ -233,6 +242,7 @@ static void uvHandleReq(SSrvConn* pConn) { pHead->msgLen -= sizeof(STransUserMsg); } } + CONN_SHOULD_RELEASE(pConn, pHead); STransMsg transMsg; transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -257,8 +267,8 @@ static void uvHandleReq(SSrvConn* pConn) { ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, - TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); // no ref here } @@ -270,6 +280,8 @@ static void uvHandleReq(SSrvConn* pConn) { (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth +_RETURE: + return; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -350,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - conn->broken = false; + conn->broken = true; transUnrefSrvHandle(conn); } } @@ -407,6 +419,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; if (pConn->broken == true) { + // persist by transUnrefSrvHandle(pConn); return; } @@ -415,8 +428,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { } if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; } From 9d12273a9a13484a9b3f4ec9532674d67112a8d9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 19 Mar 2022 14:35:24 +0800 Subject: [PATCH 6/6] merge 3.0 --- source/libs/transport/src/transCli.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 126b6cb91f..842b5a1b4b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -844,7 +844,10 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; - pCtx->appCtx = *ctx; + + if (ctx != NULL) { + pCtx->appCtx = *ctx; + } assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not