diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c82b7c0532..73427446e6 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -119,6 +119,13 @@ typedef struct SExHandle { void* pThrd; } SExHandle; +typedef struct { + STransMsg* pRsp; + tsem_t* pSem; + int8_t inited; + SRWLatch latch; +} STransSyncMsg; + /*convet from fqdn to ip */ typedef struct SCvtAddr { char ip[TSDB_FQDN_LEN]; @@ -133,11 +140,13 @@ typedef struct { tmsg_t msgType; // message type int8_t connType; // connection type cli/srv - STransCtx appCtx; // - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SCvtAddr cvtAddr; - bool setMaxRetry; + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + STransSyncMsg* pSyncMsg; // for syncchronous with timeout API + int64_t syncMsgRef; + SCvtAddr cvtAddr; + bool setMaxRetry; int32_t retryMinInterval; int32_t retryMaxInterval; @@ -307,6 +316,7 @@ int transReleaseSrvHandle(void* handle); int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs); int transSendResponse(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); @@ -432,10 +442,11 @@ int64_t transAddExHandle(int32_t refMgt, void* p); int32_t transRemoveExHandle(int32_t refMgt, int64_t refId); void* transAcquireExHandle(int32_t refMgt, int64_t refId); int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); -void transDestoryExHandle(void* handle); +void transDestroyExHandle(void* handle); int32_t transGetRefMgt(); int32_t transGetInstMgt(); +int32_t transGetSyncMsgMgt(); void transHttpEnvDestroy(); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 7b1ae087f2..6842d9ee82 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -169,6 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { return transSendRecv(shandle, pEpSet, pMsg, pRsp); } +int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) { + return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs); +} int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 677e08ec56..3156a0733c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2411,15 +2411,24 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } } - if (pCtx->pSem != NULL) { + if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); - if (pCtx->pRsp == NULL) { - tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); + if (pCtx->pSem) { + if (pCtx->pRsp == NULL) { + tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); + } else { + memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); + } + tsem_post(pCtx->pSem); + pCtx->pRsp = NULL; } else { - memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); + STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); + if (pSyncMsg != NULL) { + memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); + tsem_post(pSyncMsg->pSem); + taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); + } } - tsem_post(pCtx->pSem); - pCtx->pRsp = NULL; } else { tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (retry == false && hasEpSet == true) { @@ -2563,7 +2572,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran } int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); return -1; @@ -2587,7 +2597,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->pSem = sem; - pCtx->pRsp = pRsp; + pCtx->pRsp = pTransRsp; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; @@ -2607,10 +2617,84 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs } tsem_wait(sem); + memcpy(pRsp, pTransRsp, sizeof(STransMsg)); + _RETURN: tsem_destroy(sem); taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosMemoryFree(pTransRsp); + return ret; +} +int64_t transCreateSyncMsg(STransMsg* pTransMsg) { + tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); + tsem_init(sem, 0, 0); + + STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + + taosInitRWLatch(&pSyncMsg->latch); + pSyncMsg->inited = 0; + pSyncMsg->pRsp = pTransMsg; + pSyncMsg->pSem = sem; + + return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); +} +int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return -1; + } + + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); + + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + epsetAssign(&pCtx->epSet, pEpSet); + epsetAssign(&pCtx->origEpSet, pEpSet); + pCtx->ahandle = pReq->info.ahandle; + pCtx->msgType = pReq->msgType; + pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + + int64_t ref = pCtx->syncMsgRef; + STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); + + SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->msg = *pReq; + cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; + + STraceId* trace = &pReq->info.traceId; + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); + + int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (ret != 0) { + destroyCmsg(cliMsg); + goto _RETURN; + } + + ret = tsem_timewait(pSyncMsg->pSem, timeoutMs); + if (ret < 0) { + pRsp->code = TSDB_CODE_TIMEOUT_ERROR; + ret = TSDB_CODE_TIMEOUT_ERROR; + } else { + memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); + ret = 0; + } +_RETURN: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosReleaseRef(transGetSyncMsgMgt(), ref); + taosRemoveRef(transGetSyncMsgMgt(), ref); return ret; } /* diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3dc59a93ee..759a4d79db 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -21,6 +21,9 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; static int32_t instMgt; +static int32_t transSyncMsgMgt; + +void transDestroySyncMsg(void* msg); int32_t transCompressMsg(char* msg, int32_t len) { int32_t ret = 0; @@ -601,13 +604,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } static void transInitEnv() { - refMgt = transOpenRefMgt(50000, transDestoryExHandle); + refMgt = transOpenRefMgt(50000, transDestroyExHandle); instMgt = taosOpenRef(50, rpcCloseImpl); + transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } static void transDestroyEnv() { transCloseRefMgt(refMgt); transCloseRefMgt(instMgt); + transCloseRefMgt(transSyncMsgMgt); } void transInit() { @@ -617,6 +622,7 @@ void transInit() { int32_t transGetRefMgt() { return refMgt; } int32_t transGetInstMgt() { return instMgt; } +int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } void transCleanup() { // clean env @@ -648,13 +654,24 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) { // release extern handle return taosReleaseRef(refMgt, refId); } -void transDestoryExHandle(void* handle) { +void transDestroyExHandle(void* handle) { if (handle == NULL) { return; } taosMemoryFree(handle); } +void transDestroySyncMsg(void* msg) { + if (msg == NULL) return; + + STransSyncMsg* pSyncMsg = msg; + tsem_destroy(pSyncMsg->pSem); + taosMemoryFree(pSyncMsg->pSem); + + taosMemoryFree(pSyncMsg->pRsp); + taosMemoryFree(pSyncMsg); +} + // void subnetIp2int(const char* const ip_addr, uint8_t* dst) { // char ip_addr_cpy[20]; // char ip[5];