From ad29b90cc2836473994d87ae81f7ccd32722a9a3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 28 Jun 2022 21:43:40 +0800 Subject: [PATCH 1/4] fix: fix rpc quit problem --- include/libs/transport/trpc.h | 2 +- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 24 ++++++++++++++++++++++++ source/libs/transport/src/transSvr.c | 22 +++------------------- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2b8c6a895e..27153b5efd 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -45,7 +45,7 @@ typedef struct SRpcHandleInfo { int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t persistHandle; // persist handle or not STraceId traceId; - // int64_t traceId; + int8_t hasEpSet; // app info void *ahandle; // app handle set by client diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 55db0b129a..1283946240 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -148,6 +148,7 @@ typedef struct { char release : 2; char secured : 2; char spi : 2; + char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; STraceId traceId; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index aba2e6957b..9e52fc21a3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -312,6 +312,7 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.info.ahandle = NULL; transMsg.info.traceId = pHead->traceId; + transMsg.info.hasEpSet = pHead->hasEpSet; SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; @@ -1014,6 +1015,23 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { *val = newVal; } } + +bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { + if (pResp == NULL || pResp->info.hasEpSet == 0) { + return false; + } + tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst); + int32_t tlen = tSerializeSEpSet(NULL, 0, dst); + + int32_t bufLen = pResp->contLen - tlen; + char* buf = rpcMallocCont(bufLen); + + memcpy(buf, pResp->pCont + tlen, bufLen); + + pResp->pCont = buf; + pResp->contLen = bufLen; + return true; +} int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1058,6 +1076,12 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } STraceId* trace = &pResp->info.traceId; + + if (cliTryToExtractEpSet(pResp, &pCtx->epSet)) { + char tbuf[256] = {0}; + EPSET_DEBUG_STR(&pCtx->epSet, tbuf); + tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + } if (pCtx->pSem != NULL) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 08363b3c7c..ed1abd10f7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -144,15 +144,6 @@ static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp static int32_t exHandlesMgt; -// void uvInitEnv(); -// void uvOpenExHandleMgt(int size); -// void uvCloseExHandleMgt(); -// int64_t uvAddExHandle(void* p); -// int32_t uvRemoveExHandle(int64_t refId); -// int32_t uvReleaseExHandle(int64_t refId); -// void uvDestoryExHandle(void* handle); -// SExHandle* uvAcquireExHandle(int64_t refId); - static void uvDestroyConn(uv_handle_t* handle); // server and worker thread @@ -350,16 +341,8 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSvrMsg* msg = transQueuePop(&conn->srvMsgs); - // if (msg->type == Release && conn->status != ConnNormal) { - // conn->status = ConnNormal; - // transUnrefSrvHandle(conn); - // reallocConnRef(conn); - // destroySmsg(msg); - // transQueueClear(&conn->srvMsgs); - // return; - //} destroySmsg(msg); - // send second data, just use for push + // send cached data if (!transQueueEmpty(&conn->srvMsgs)) { msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg->type == Register && conn->status == ConnAcquire) { @@ -396,7 +379,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { tError("fail to dispatch conn to work thread"); } uv_close((uv_handle_t*)req->data, uvFreeCb); - // taosMemoryFree(req->data); taosMemoryFree(req); } @@ -410,6 +392,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; + pHead->hasEpSet = pMsg->info.hasEpSet; if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; @@ -422,6 +405,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { transUnrefSrvHandle(pConn); } else { pHead->msgType = pMsg->msgType; + // set up resp msg type if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) pHead->msgType = pConn->inType + 1; } From ce21baaa86cf8e72acfaf3e016c366a619f77bb1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 29 Jun 2022 10:44:18 +0800 Subject: [PATCH 2/4] feat: enh redirect --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9e52fc21a3..6390855e43 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1026,7 +1026,7 @@ bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { int32_t bufLen = pResp->contLen - tlen; char* buf = rpcMallocCont(bufLen); - memcpy(buf, pResp->pCont + tlen, bufLen); + memcpy(buf, (char*)pResp->pCont + tlen, bufLen); pResp->pCont = buf; pResp->contLen = bufLen; From 5691b0e82f02bdc905b8814b7f89c782cda0dbb5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 29 Jun 2022 11:04:04 +0800 Subject: [PATCH 3/4] feat:modify code --- include/libs/transport/trpc.h | 2 +- source/libs/transport/inc/transComm.h | 7 ++----- source/libs/transport/src/transSvr.c | 2 -- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 27153b5efd..8471aa8286 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -123,7 +123,7 @@ void * rpcReallocCont(void *ptr, int32_t contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock +void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 1283946240..59f79db6fb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -378,13 +378,10 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); - +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue); +int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); - -// void transPrintEpSet(SEpSet* pEpSet); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* * init global func diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index ed1abd10f7..c190950a17 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1126,6 +1126,4 @@ _return2: rpcFreeCont(msg->pCont); } -int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } - #endif From f8cf9761f8ea6e2d0e1d5d4c2fcd4b1a0e5fe021 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 29 Jun 2022 16:18:23 +0800 Subject: [PATCH 4/4] fix: avoid compile error --- source/libs/transport/inc/transComm.h | 1 - source/libs/transport/src/trans.c | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 59f79db6fb..8183b7fd9f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -295,7 +295,6 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STra void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); -int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index cc2e95cfb3..bd8195462e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -141,7 +141,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } -int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } +int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);