diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index f56860dd4f..6e2b83dce7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,9 +25,9 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlog.h" -#include "tsimplehash.h" #include "tmsg.h" #include "tmsgcb.h" +#include "tsimplehash.h" typedef enum { JOB_TASK_STATUS_NULL = 0, @@ -69,16 +69,16 @@ typedef enum { #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) #define QUERY_MSG_MASK_AUDIT() (1 << 1) #define QUERY_MSG_MASK_VIEW() (1 << 2) -#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) -#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0) -#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0) +#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0) +#define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_AUDIT()) != 0) +#define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0) typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision col_id_t numOfColumns; // the number of columns int16_t numOfPKs; - int32_t rowSize; // row size of the schema + int32_t rowSize; // row size of the schema } STableComInfo; typedef struct SIndexMeta { @@ -119,8 +119,9 @@ typedef struct STableMeta { int32_t sversion; int32_t tversion; STableComInfo tableInfo; - SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of the schema content. - SSchema schema[]; + SSchemaExt* schemaExt; // There is no additional memory allocation, and the pointer is fixed to the next address of + // the schema content. + SSchema schema[]; } STableMeta; #pragma pack(pop) @@ -196,9 +197,9 @@ typedef struct SBoundColInfo { } SBoundColInfo; typedef struct STableColsData { - char tbName[TSDB_TABLE_NAME_LEN]; - SArray* aCol; - bool getFromHash; + char tbName[TSDB_TABLE_NAME_LEN]; + SArray* aCol; + bool getFromHash; } STableColsData; typedef struct STableVgUid { @@ -207,15 +208,14 @@ typedef struct STableVgUid { } STableVgUid; typedef struct STableBufInfo { - void* pCurBuff; - SArray* pBufList; - int64_t buffUnit; - int64_t buffSize; - int64_t buffIdx; - int64_t buffOffset; + void* pCurBuff; + SArray* pBufList; + int64_t buffUnit; + int64_t buffSize; + int64_t buffIdx; + int64_t buffOffset; } STableBufInfo; - typedef struct STableDataCxt { STableMeta* pMeta; STSchema* pSchema; @@ -237,23 +237,22 @@ typedef struct SStbInterlaceInfo { void* pRequest; uint64_t requestId; int64_t requestSelf; - bool tbFromHash; + bool tbFromHash; SHashObj* pVgroupHash; SArray* pVgroupList; SSHashObj* pTableHash; int64_t tbRemainNum; STableBufInfo tbBuf; char firstName[TSDB_TABLE_NAME_LEN]; - STSchema *pTSchema; - STableDataCxt *pDataCtx; - void *boundTags; + STSchema* pTSchema; + STableDataCxt* pDataCtx; + void* boundTags; - bool tableColsReady; - SArray *pTableCols; - int32_t pTableColsIdx; + bool tableColsReady; + SArray* pTableCols; + int32_t pTableColsIdx; } SStbInterlaceInfo; - typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); @@ -308,6 +307,8 @@ void destroyAhandle(void* ahandle); int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* ctx); +int32_t asyncFreeConnById(void* pTransporter, int64_t pid); +; /** * Asynchronously send message to server, after the response received, the callback will be incured. * @@ -325,7 +326,7 @@ void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); -int32_t getAsofJoinReverseOp(EOperatorType op); +int32_t getAsofJoinReverseOp(EOperatorType op); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); @@ -384,7 +385,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ - (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ + (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6c0d04354a..5b860cc23a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,6 +125,7 @@ typedef struct SRpcInit { int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; + int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait void *parent; } SRpcInit; @@ -158,18 +159,21 @@ void *rpcReallocCont(void *ptr, int64_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side // Please use tmsg functions, which are defined in tmsgcb.h -int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -int rpcSendResponse(const SRpcMsg *pMsg); -int rpcRegisterBrokenLinkArg(SRpcMsg *msg); -int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock +int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +int32_t rpcSendResponse(const SRpcMsg *pMsg); +int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg); +int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, +int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, int32_t timeoutMs); -int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void *rpcAllocHandle(); + +int32_t rpcFreeConnById(void *shandle, int64_t connId); + +int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +int32_t rpcAllocHandle(int64_t *refId); int32_t rpcSetIpWhite(void *thandl, void *arg); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3d758e1fd3..9bbfdd18b0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -387,6 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + rpcInit.notWaitAvaliableConn = 1; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 668d40dd0b..d295e868e9 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -202,6 +202,7 @@ typedef struct SExchangeInfo { SLimitInfo limitInfo; int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo char* pTaskId; + SArray* pFetchRpcHandles; } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..22bb6524f1 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -298,13 +298,13 @@ _end: pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - (*ppRes) = NULL; + (*ppRes) = NULL; return code; } static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = loadRemoteDataNext(pOperator, &pRes); + int32_t code = loadRemoteDataNext(pOperator, &pRes); return pRes; } @@ -346,6 +346,14 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources); return TSDB_CODE_INVALID_PARA; } + pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t)); + if (!pInfo->pFetchRpcHandles) { + return terrno; + } + void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources); + if (!ret) { + return terrno; + } pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); if (pInfo->pSources == NULL) { @@ -384,6 +392,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); + return initDataSource(numOfSources, pInfo, id); } @@ -391,7 +400,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = 0; int32_t lino = 0; SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -468,6 +477,14 @@ void freeSourceDataInfo(void* p) { void doDestroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; + for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) { + int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i); + if (*pRpcHandle > 0) { + SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i); + (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle); + } + } + taosArrayDestroy(pExInfo->pFetchRpcHandles); taosArrayDestroy(pExInfo->pSources); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); @@ -495,6 +512,8 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } int32_t index = pWrapper->sourceIndex; + int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index); + *pRpcHandle = -1; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); if (!pSourceDataInfo) { return terrno; @@ -668,6 +687,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas int64_t transporterId = 0; code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); QUERY_CHECK_CODE(code, lino, _end); + int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex); + *pRpcHandle = transporterId; } _end: @@ -690,7 +711,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo int32_t lino = 0; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - code = blockDecode(pRes, pData, (const char**) pNextStart); + code = blockDecode(pRes, pData, (const char**)pNextStart); if (code) { return code; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 9b61f81939..d47a183121 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -225,6 +225,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .code = 0 }; TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); + int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); if (code) { destroySendMsgInfo(pInfo); @@ -235,6 +236,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } +int32_t asyncFreeConnById(void* pTransporter, int64_t pid) { + return rpcFreeConnById(pTransporter, pid); +} char* jobTaskStatusStr(int32_t status) { switch (status) { @@ -448,13 +452,13 @@ void parseTagDatatoJson(void* p, char** jsonStr) { if (value == NULL) { goto end; } - if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + if (!cJSON_AddItemToObject(json, tagJsonKey, value)) { goto end; } } else if (type == TSDB_DATA_TYPE_NCHAR) { cJSON* value = NULL; if (pTagVal->nData > 0) { - char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); + char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); if (tagJsonValue == NULL) { goto end; } @@ -479,7 +483,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) { goto end; } - if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + if (!cJSON_AddItemToObject(json, tagJsonKey, value)) { goto end; } } else if (type == TSDB_DATA_TYPE_DOUBLE) { @@ -488,7 +492,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) { if (value == NULL) { goto end; } - if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + if (!cJSON_AddItemToObject(json, tagJsonKey, value)) { goto end; } } else if (type == TSDB_DATA_TYPE_BOOL) { @@ -497,7 +501,7 @@ void parseTagDatatoJson(void* p, char** jsonStr) { if (value == NULL) { goto end; } - if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + if (!cJSON_AddItemToObject(json, tagJsonKey, value)) { goto end; } } else { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 08a8f684f5..9215254f9c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -17,11 +17,11 @@ #include "command.h" #include "query.h" #include "schInt.h" +#include "tglobal.h" +#include "tmisce.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" -#include "tglobal.h" -#include "tmisce.h" // clang-format off int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { @@ -975,11 +975,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); if (isHb && persistHandle && trans->pHandle == 0) { - trans->pHandle = rpcAllocHandle(); - if (NULL == trans->pHandle) { - SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno); - SCH_ERR_JRET(terrno); + int64_t refId = 0; + code = rpcAllocHandle(&refId); + if (code != 0) { + SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code); + SCH_ERR_JRET(code); } + trans->pHandle = (void *)refId; } if (pJob && pTask) { @@ -1200,7 +1202,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } persistHandle = true; - SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + int64_t refId = 0; + code = rpcAllocHandle(&refId); + if (code != 0) { + SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code); + SCH_ERR_JRET(code); + } + + SCH_SET_TASK_HANDLE(pTask, (void *)refId); break; } case TDMT_SCH_FETCH: diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e66941244c..820075787f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -148,7 +148,6 @@ typedef struct { STransSyncMsg* pSyncMsg; // for syncchronous with timeout API int64_t syncMsgRef; SCvtAddr cvtAddr; - bool setMaxRetry; int32_t retryMinInterval; int32_t retryMaxInterval; @@ -207,7 +206,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; +typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -304,10 +303,10 @@ int32_t transClearBuffer(SConnBuffer* buf); int32_t transDestroyBuffer(SConnBuffer* buf); int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); +int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); -int transSetConnOption(uv_tcp_t* stream, int keepalive); +int32_t transSetConnOption(uv_tcp_t* stream, int keepalive); void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); @@ -315,21 +314,24 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); -int transReleaseCliHandle(void* handle); -int transReleaseSrvHandle(void* handle); +int32_t transReleaseCliHandle(void* handle); +int32_t transReleaseSrvHandle(void* handle); -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, +int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs); -int transSendResponse(const STransMsg* msg); -int transRegisterMsg(const STransMsg* msg); -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId); +int32_t transFreeConnById(void* shandle, int64_t transpointId); + +int32_t transSendResponse(const STransMsg* msg); +int32_t transRegisterMsg(const STransMsg* msg); +int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); -int transSockInfo2Str(struct sockaddr* sockname, char* dst); +int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst); -int64_t transAllocHandle(); +int32_t transAllocHandle(int64_t* refId); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 7853e25cff..703a4dde3e 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -58,6 +58,8 @@ typedef struct { int32_t failFastThreshold; int32_t failFastInterval; + int8_t notWaitAvaliableConn; // 1: no delay, 0: delay + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index fbcc74e8e1..8b99443a84 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -102,6 +102,8 @@ void* rpcOpen(const SRpcInit* pInit) { if (pRpc->timeToGetConn == 0) { pRpc->timeToGetConn = 10 * 1000; } + pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn; + pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); @@ -163,38 +165,48 @@ void* rpcReallocCont(void* ptr, int64_t contLen) { return st + TRANS_MSG_OVERHEAD; } -int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { +int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { return transSendRequest(shandle, pEpSet, pMsg, NULL); } -int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - return transSendRequest(shandle, pEpSet, pMsg, pCtx); -} -int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - return transSendRecv(shandle, pEpSet, pMsg, pRsp); -} -int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, - int32_t timeoutMs) { - return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); +int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) { + return transSendRequest(shandle, pEpSet, pMsg, pCtx); + } else { + return transSendRequestWithId(shandle, pEpSet, pMsg, pRid); + } } -int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } +int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { + return transSendRequestWithId(shandle, pEpSet, pReq, transpointId); +} + +int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { + return transSendRecv(shandle, pEpSet, pMsg, pRsp); +} +int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs) { + return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); +} +int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); } + +int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); } void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } -int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } +int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } +int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } // client only -int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { +int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later return transSetDefaultAddr(thandle, ip, fqdn); } // server only int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); } -void* rpcAllocHandle() { return (void*)transAllocHandle(); } +int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); } int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); } int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2e61f19af8..4b324e52c6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -213,8 +213,10 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); -static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, - cliHandleUpdate}; +static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd); + +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, + NULL, cliHandleUpdate, cliHandleFreeById}; /// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, /// NULL,cliHandleUpdate}; @@ -660,7 +662,9 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { if (QUEUE_IS_EMPTY(&plist->conns)) { if (plist->list->numOfConn >= pTranInst->connLimitNum) { *exceed = true; + return NULL;; } + plist->list->numOfConn++; return NULL; } @@ -704,7 +708,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; - if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { + if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) { tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); @@ -899,10 +903,12 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { exh->handle = conn; exh->pThrd = conn->hostThrd; taosWUnLockLatch(&exh->latch); - + conn->refId = exh->refId; taosWUnLockLatch(&exh->latch); + tDebug("conn %p specified by %"PRId64"", conn, handle); + (void)transReleaseExHandle(transGetRefMgt(), handle); return 0; } @@ -1035,7 +1041,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { list->size--; } } - conn->list = NULL; (void)transReleaseExHandle(transGetRefMgt(), conn->refId); @@ -1075,8 +1080,11 @@ static void cliDestroy(uv_handle_t* handle) { (void)atomic_sub_fetch_32(&pThrd->connCount, 1); + if (conn->refId > 0) { (void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + + } taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); @@ -1589,6 +1597,40 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { pThrd->cvtAddr = pCtx->cvtAddr; destroyCmsg(pMsg); } +static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code = 0; + int64_t refId = (int64_t)(pMsg->msg.info.handle); + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("id %" PRId64 " already released", refId); + destroyCmsg(pMsg); + return; + } + + taosRLockLatch(&exh->latch); + SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + + if (conn == NULL || conn->refId != refId) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); + } + tDebug("do free conn %p by id %" PRId64 "", conn, refId); + + int32_t size = transQueueSize(&conn->cliMsgs); + if (size == 0) { + // already recv, and notify upper layer + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); + return; + } else { + while (T_REF_VAL_GET(conn) >= 1) transUnrefCliHandle(conn); + } + return; +_exception: + tDebug("already free conn %p by id %" PRId64"", conn, refId); + + (void)transReleaseExHandle(transGetRefMgt(), refId); + destroyCmsg(pMsg); +} SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { STransConnCtx* pCtx = (*pMsg)->ctx; @@ -2759,7 +2801,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); return pThrd; } -int transReleaseCliHandle(void* handle) { +int32_t transReleaseCliHandle(void* handle) { int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { @@ -2823,25 +2865,25 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; QUEUE_INIT(&cliMsg->seqq); + *pCliMsg = cliMsg; return 0; } -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception;); } + if (handle != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh != NULL) { @@ -2849,26 +2891,27 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (exh->handle == NULL && exh->inited != 0) { SCliMsg* pCliMsg = NULL; code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); - ASSERT(code == 0); + if (code != 0) { + taosWUnLockLatch(&exh->latch); + (void)transReleaseExHandle(transGetRefMgt(), handle); + TAOS_CHECK_GOTO(code, NULL, _exception); + } QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; + } else { + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + (void)transReleaseExHandle(transGetRefMgt(), handle); } - exh->inited = 1; - taosWUnLockLatch(&exh->latch); - (void)transReleaseExHandle(transGetRefMgt(), handle); } } SCliMsg* pCliMsg = NULL; - code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); - if (code != 0) { - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return code; - } + TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _exception); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -2880,13 +2923,63 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran } (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; + +_exception: + transFreeMsg(pReq->pCont); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; +} +int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { + if (transpointId == NULL) { + ASSERT(0); + return TSDB_CODE_INVALID_PARA; + } + int32_t code = 0; + + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception); + } + + TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _exception); + + SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _exception); + } + + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), *transpointId); + if (exh == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception); + } + + pReq->info.handle = (void*)(*transpointId); + + SCliMsg* pCliMsg = NULL; + TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _exception); + + STraceId* trace = &pReq->info.traceId; + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { + destroyCmsg(pCliMsg); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + } + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + +_exception: + transFreeMsg(pReq->pCont); + (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { +int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } int32_t code = 0; @@ -2908,8 +3001,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs code = tsem_init(sem, 0, 0); if (code != 0) { taosMemoryFree(sem); - code = TAOS_SYSTEM_ERROR(errno); - TAOS_CHECK_GOTO(code, NULL, _RETURN1); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1); } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -3003,13 +3095,13 @@ _EXIT: taosMemoryFree(pSyncMsg); return code; } -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, - int32_t timeoutMs) { +int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs) { int32_t code = 0; STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); @@ -3096,22 +3188,21 @@ _RETURN2: /* * **/ -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { +int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { + if (ip == NULL || fqdn == NULL) return TSDB_CODE_INVALID_PARA; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { - return TSDB_CODE_RPC_BROKEN_LINK; + return TSDB_CODE_RPC_MODULE_QUIT; } SCvtAddr cvtAddr = {0}; - if (ip != NULL && fqdn != NULL) { - tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); - tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); - cvtAddr.cvt = true; - } + tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip)); + tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); + cvtAddr.cvt = true; int32_t code = 0; - int8_t i = 0; - for (i = 0; i < pTransInst->numOfThreads; i++) { + for (int8_t i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); if (pCtx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -3136,7 +3227,9 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { destroyCmsg(cliMsg); - code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) { + code = TSDB_CODE_RPC_MODULE_QUIT; + } break; } } @@ -3145,7 +3238,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { return code; } -int64_t transAllocHandle() { +int32_t transAllocHandle(int64_t* refId) { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); if (exh == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3166,5 +3259,43 @@ int64_t transAllocHandle() { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); - return exh->refId; + *refId = exh->refId; + return 0; +} +int32_t transFreeConnById(void* shandle, int64_t transpointId) { + int32_t code = 0; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + return TSDB_CODE_RPC_MODULE_QUIT; + } + if (transpointId == 0) { + tDebug("not free by refId:%"PRId64"", transpointId); + TAOS_CHECK_GOTO(0, NULL, _exception); + } + + SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception); + } + + SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (pCli == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } + pCli->type = FreeById; + + tDebug("release conn id %" PRId64 "", transpointId); + + STransMsg msg = {.info.handle = (void*)transpointId}; + pCli->msg = msg; + + code = transAsyncSend(pThrd->asyncPool, &pCli->q); + if (code != 0) { + taosMemoryFree(pCli); + TAOS_CHECK_GOTO(code, NULL, _exception); + } + +_exception: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 9df0ddb6f3..148f4d4e9a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -234,7 +234,7 @@ bool transReadComplete(SConnBuffer* connBuf) { return (p->left == 0 || p->invalid) ? true : false; } -int transSetConnOption(uv_tcp_t* stream, int keepalive) { +int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) { #if defined(WINDOWS) || defined(DARWIN) #else return uv_tcp_keepalive(stream, 1, keepalive); @@ -745,8 +745,7 @@ int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) { return taosRemoveRef(refMgt, refId); } -void* transAcquireExHandle(int32_t refMgt, int64_t refId) { - // acquire extern handle +void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle return (void*)taosAcquireRef(refMgt, refId); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1e0d54eb5b..0202fbc599 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1707,7 +1707,7 @@ void transUnrefSrvHandle(void* handle) { } } -int transReleaseSrvHandle(void* handle) { +int32_t transReleaseSrvHandle(void* handle) { int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; @@ -1747,7 +1747,7 @@ _return2: return code; } -int transSendResponse(const STransMsg* msg) { +int32_t transSendResponse(const STransMsg* msg) { int32_t code = 0; if (msg->info.noResp) { @@ -1800,7 +1800,7 @@ _return2: rpcFreeCont(msg->pCont); return code; } -int transRegisterMsg(const STransMsg* msg) { +int32_t transRegisterMsg(const STransMsg* msg) { int32_t code = 0; SExHandle* exh = msg->info.handle; @@ -1891,4 +1891,4 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { return code; } -int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } +int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }