diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 807a44a532..5ea53b5510 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); +typedef struct { + SEpSet epSet; +} SMEpSet; + +int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq); +int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq); + typedef struct { int8_t connType; int32_t pid; @@ -2725,6 +2732,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p } return buf; } + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 3e2f596784..ab26cfc155 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -54,12 +54,13 @@ typedef struct { uint16_t clientPort; SRpcMsg rpcMsg; int32_t rspLen; - void *pRsp; - void *pNode; + void * pRsp; + void * pNode; } SNodeMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); +typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *); typedef struct SRpcInit { uint16_t localPort; // local port @@ -80,22 +81,25 @@ typedef struct SRpcInit { RpcCfp cfp; // call back to retrieve the client auth info, for server app only - RpcAfp afp;; + RpcAfp afp; + + // user defined retry func + RpcRfp rfp; void *parent; } SRpcInit; typedef struct { - void *val; + void *val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcCtxVal; typedef struct { - int32_t msgType; - void *val; + int32_t msgType; + void * val; int32_t (*clone)(void *src, void **dst); - void (*freeFunc)(const void *arg); + void (*freeFunc)(const void *arg); } SRpcBrokenlinkVal; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c6bef560a9..0ea196f1a1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -844,6 +844,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { taosArrayDestroy(pReq->pFields); pReq->pFields = NULL; } +int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1; + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} +int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1; + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { SCoder encoder = {0}; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index ad50948a02..eaca9b0fc7 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,13 +63,14 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + int (*retry)(void* parent, SRpcMsg*, SEpSet*); - int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index ebb90338cd..fa517d6d61 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; + pRpc->retry = pInit->rfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.contLen = sizeof(SEpSet); - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - if (rpcMsg.pCont == NULL) return; - - memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet)); + SMEpSet msg = {.epSet = *pEpSet}; + int32_t len = tSerializeSMEpSet(NULL, 0, &msg); + rpcMsg.pCont = rpcMallocCont(len); + tSerializeSMEpSet(rpcMsg.pCont, len, &msg); rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.handle = thandle; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b81310b90b..b43b8a1e0c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -31,12 +31,8 @@ typedef struct SCliConn { int hThrdIdx; STransCtx ctx; - bool broken; // link broken or not - ConnStatus status; // - int release; // 1: release - // spi configure - char spi; - char secured; + bool broken; // link broken or not + ConnStatus status; // char* ip; uint32_t port; @@ -44,7 +40,6 @@ typedef struct SCliConn { // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; - } SCliConn; typedef struct SCliMsg { @@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); +static void cliAppCb(SCliConn* pConn, STransMsg* pMsg); + static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle); @@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) { TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); - conn->secured = pHead->secured; - if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { tTrace("except, server continue send while cli ignore it"); // transUnrefCliHandle(conn); @@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) { if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(conn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); @@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) { once = true; continue; } - (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + cliAppCb(pConn, &transMsg); + //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); @@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } +void cliAppCb(SCliConn* pConn, STransMsg* transMsg) { + SCliThrdObj* pThrd = pConn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + + if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) { + SMEpSet emsg = {0}; + tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg); + pTransInst->retry(pTransInst, transMsg, &(emsg.epSet)); + } else { + pTransInst->cfp(pTransInst->parent, transMsg, NULL); + } +} void transCloseClient(void* arg) { SCliObj* cli = arg; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ec66f3e8df..59a30051ef 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -30,7 +30,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int ref; int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; @@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); uv_walk(thrd->loop, uvWalkCb, NULL); - // uv_loop_close(thrd->loop); - // uv_stop(thrd->loop); } } @@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_walk(thrd->loop, uvWalkCb, NULL); - // uv_loop_close(thrd->loop); - // uv_stop(thrd->loop); } else { destroyAllConn(thrd); } @@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) { if (handle == NULL) { return; } - SSrvConn* conn = handle; - int ref = T_REF_INC((SSrvConn*)handle); - UNUSED(ref); + tDebug("server conn %p ref count: %d", handle, ref); } void transUnrefSrvHandle(void* handle) {