add new interface
This commit is contained in:
parent
2d57e77df8
commit
188883341a
|
@ -158,18 +158,21 @@ void *rpcReallocCont(void *ptr, int64_t contLen);
|
||||||
// Because taosd supports multi-process mode
|
// Because taosd supports multi-process mode
|
||||||
// These functions should not be used on the server side
|
// These functions should not be used on the server side
|
||||||
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
|
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
|
||||||
int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
|
||||||
int rpcSendResponse(const SRpcMsg *pMsg);
|
int32_t rpcSendResponse(const SRpcMsg *pMsg);
|
||||||
int rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg);
|
||||||
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
|
||||||
|
|
||||||
// These functions will not be called in the child process
|
// These functions will not be called in the child process
|
||||||
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||||
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
int32_t rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
int32_t rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
|
||||||
int32_t timeoutMs);
|
int32_t timeoutMs);
|
||||||
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
|
||||||
void *rpcAllocHandle();
|
int32_t rpcFreeConnById(void *shandle, int64_t connId);
|
||||||
|
|
||||||
|
int32_t rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||||
|
int32_t rpcAllocHandle(int64_t *refId);
|
||||||
int32_t rpcSetIpWhite(void *thandl, void *arg);
|
int32_t rpcSetIpWhite(void *thandl, void *arg);
|
||||||
|
|
||||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
|
||||||
|
|
|
@ -17,11 +17,11 @@
|
||||||
#include "command.h"
|
#include "command.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "schInt.h"
|
#include "schInt.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tmisce.h"
|
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||||
|
@ -975,11 +975,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
||||||
|
|
||||||
if (isHb && persistHandle && trans->pHandle == 0) {
|
if (isHb && persistHandle && trans->pHandle == 0) {
|
||||||
trans->pHandle = rpcAllocHandle();
|
int64_t refId = 0;
|
||||||
if (NULL == trans->pHandle) {
|
code = rpcAllocHandle(&refId);
|
||||||
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno);
|
if (code != 0) {
|
||||||
SCH_ERR_JRET(terrno);
|
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
trans->pHandle = (void *)refId;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pJob && pTask) {
|
if (pJob && pTask) {
|
||||||
|
@ -1200,7 +1202,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
persistHandle = true;
|
persistHandle = true;
|
||||||
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
|
int64_t refId = 0;
|
||||||
|
code = rpcAllocHandle(&refId);
|
||||||
|
if (code != 0) {
|
||||||
|
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", code);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_SET_TASK_HANDLE(pTask, (void *)refId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
|
|
@ -207,7 +207,7 @@ typedef struct {
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
|
typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType;
|
||||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
|
@ -304,10 +304,10 @@ int32_t transClearBuffer(SConnBuffer* buf);
|
||||||
int32_t transDestroyBuffer(SConnBuffer* buf);
|
int32_t transDestroyBuffer(SConnBuffer* buf);
|
||||||
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||||
|
|
||||||
void transRefSrvHandle(void* handle);
|
void transRefSrvHandle(void* handle);
|
||||||
void transUnrefSrvHandle(void* handle);
|
void transUnrefSrvHandle(void* handle);
|
||||||
|
@ -315,21 +315,24 @@ void transUnrefSrvHandle(void* handle);
|
||||||
void transRefCliHandle(void* handle);
|
void transRefCliHandle(void* handle);
|
||||||
void transUnrefCliHandle(void* handle);
|
void transUnrefCliHandle(void* handle);
|
||||||
|
|
||||||
int transReleaseCliHandle(void* handle);
|
int32_t transReleaseCliHandle(void* handle);
|
||||||
int transReleaseSrvHandle(void* handle);
|
int32_t transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
|
||||||
int32_t timeoutMs);
|
int32_t timeoutMs);
|
||||||
int transSendResponse(const STransMsg* msg);
|
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
|
||||||
int transRegisterMsg(const STransMsg* msg);
|
int32_t transFreeConnById(void* shandle, int64_t transpointId);
|
||||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
|
||||||
|
int32_t transSendResponse(const STransMsg* msg);
|
||||||
|
int32_t transRegisterMsg(const STransMsg* msg);
|
||||||
|
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||||
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
|
||||||
|
|
||||||
int transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
|
||||||
|
|
||||||
int64_t transAllocHandle();
|
int32_t transAllocHandle(int64_t* refId);
|
||||||
|
|
||||||
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
|
@ -163,38 +163,44 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
|
||||||
return st + TRANS_MSG_OVERHEAD;
|
return st + TRANS_MSG_OVERHEAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||||
}
|
}
|
||||||
int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||||
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
|
||||||
}
|
}
|
||||||
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
|
||||||
|
int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
|
||||||
|
return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||||
}
|
}
|
||||||
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
|
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
|
||||||
int32_t timeoutMs) {
|
int32_t timeoutMs) {
|
||||||
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
|
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
|
||||||
}
|
}
|
||||||
|
int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); }
|
||||||
|
|
||||||
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
||||||
|
|
||||||
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
|
void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
|
||||||
|
|
||||||
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
|
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
|
||||||
|
|
||||||
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
|
int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
|
||||||
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
|
int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
|
||||||
|
|
||||||
// client only
|
// client only
|
||||||
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||||
// later
|
// later
|
||||||
return transSetDefaultAddr(thandle, ip, fqdn);
|
return transSetDefaultAddr(thandle, ip, fqdn);
|
||||||
}
|
}
|
||||||
// server only
|
// server only
|
||||||
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
|
int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); }
|
||||||
|
|
||||||
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
|
int32_t rpcAllocHandle(int64_t* refId) { return transAllocHandle(refId); }
|
||||||
|
|
||||||
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
|
int32_t rpcUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { return transUtilSIpRangeToStr(pRange, buf); }
|
||||||
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
|
int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
|
||||||
|
|
|
@ -213,8 +213,10 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||||
cliHandleUpdate};
|
|
||||||
|
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||||
|
NULL, cliHandleUpdate, cliHandleFreeById};
|
||||||
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||||
/// NULL,cliHandleUpdate};
|
/// NULL,cliHandleUpdate};
|
||||||
|
|
||||||
|
@ -1589,6 +1591,36 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
pThrd->cvtAddr = pCtx->cvtAddr;
|
pThrd->cvtAddr = pCtx->cvtAddr;
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
}
|
}
|
||||||
|
static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||||
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||||
|
if (exh == NULL) {
|
||||||
|
tDebug("%" PRId64 " already released", refId);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&exh->latch);
|
||||||
|
SCliConn* conn = exh->handle;
|
||||||
|
taosRUnLockLatch(&exh->latch);
|
||||||
|
|
||||||
|
if (conn->refId != refId) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t size = transQueueSize(&conn->cliMsgs);
|
||||||
|
if (size == 0) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
while (T_REF_VAL_GET(conn) >= 1) transUnrefCliHandle(conn);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
_except:
|
||||||
|
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
|
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
|
||||||
STransConnCtx* pCtx = (*pMsg)->ctx;
|
STransConnCtx* pCtx = (*pMsg)->ctx;
|
||||||
|
@ -2759,7 +2791,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
|
||||||
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
|
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
|
||||||
return pThrd;
|
return pThrd;
|
||||||
}
|
}
|
||||||
int transReleaseCliHandle(void* handle) {
|
int32_t transReleaseCliHandle(void* handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
|
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
|
@ -2823,25 +2855,25 @@ static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq
|
||||||
cliMsg->type = Normal;
|
cliMsg->type = Normal;
|
||||||
cliMsg->refId = (int64_t)shandle;
|
cliMsg->refId = (int64_t)shandle;
|
||||||
QUEUE_INIT(&cliMsg->seqq);
|
QUEUE_INIT(&cliMsg->seqq);
|
||||||
|
|
||||||
*pCliMsg = cliMsg;
|
*pCliMsg = cliMsg;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
if (pTransInst == NULL) {
|
if (pTransInst == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t handle = (int64_t)pReq->info.handle;
|
int64_t handle = (int64_t)pReq->info.handle;
|
||||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
|
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _except;);
|
||||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle != 0) {
|
if (handle != 0) {
|
||||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
|
||||||
if (exh != NULL) {
|
if (exh != NULL) {
|
||||||
|
@ -2849,26 +2881,27 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
if (exh->handle == NULL && exh->inited != 0) {
|
if (exh->handle == NULL && exh->inited != 0) {
|
||||||
SCliMsg* pCliMsg = NULL;
|
SCliMsg* pCliMsg = NULL;
|
||||||
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
|
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
|
||||||
ASSERT(code == 0);
|
if (code != 0) {
|
||||||
|
taosWUnLockLatch(&exh->latch);
|
||||||
|
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
||||||
taosWUnLockLatch(&exh->latch);
|
taosWUnLockLatch(&exh->latch);
|
||||||
tDebug("msg refId: %" PRId64 "", handle);
|
tDebug("msg refId: %" PRId64 "", handle);
|
||||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
} else {
|
||||||
exh->inited = 1;
|
exh->inited = 1;
|
||||||
taosWUnLockLatch(&exh->latch);
|
taosWUnLockLatch(&exh->latch);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SCliMsg* pCliMsg = NULL;
|
SCliMsg* pCliMsg = NULL;
|
||||||
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
|
TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg), NULL, _except);
|
||||||
if (code != 0) {
|
|
||||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
STraceId* trace = &pReq->info.traceId;
|
STraceId* trace = &pReq->info.traceId;
|
||||||
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||||
|
@ -2880,13 +2913,62 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
}
|
}
|
||||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_except:
|
||||||
|
transFreeMsg(pReq->pCont);
|
||||||
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
|
||||||
|
if (transpointId == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
if (pTransInst == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _except);
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
TAOS_CHECK_GOTO(transAllocHandle(transpointId), NULL, _except);
|
||||||
|
|
||||||
|
SCliThrd* pThrd = transGetWorkThrd(pTransInst, *transpointId);
|
||||||
|
if (pThrd == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), *transpointId);
|
||||||
|
if (exh == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
pReq->info.handle = (void*)(*transpointId);
|
||||||
|
|
||||||
|
SCliMsg* pCliMsg = NULL;
|
||||||
|
TAOS_CHECK_GOTO(transInitMsg(shandle, pEpSet, pReq, NULL, &pCliMsg), NULL, _except);
|
||||||
|
|
||||||
|
STraceId* trace = &pReq->info.traceId;
|
||||||
|
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||||
|
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
|
||||||
|
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
|
||||||
|
destroyCmsg(pCliMsg);
|
||||||
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||||
|
}
|
||||||
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_except:
|
||||||
|
transFreeMsg(pReq->pCont);
|
||||||
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
if (pTransInst == NULL) {
|
if (pTransInst == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -2908,8 +2990,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
||||||
code = tsem_init(sem, 0, 0);
|
code = tsem_init(sem, 0, 0);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFree(sem);
|
taosMemoryFree(sem);
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1);
|
||||||
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||||
|
@ -3003,13 +3084,13 @@ _EXIT:
|
||||||
taosMemoryFree(pSyncMsg);
|
taosMemoryFree(pSyncMsg);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
|
||||||
int32_t timeoutMs) {
|
int32_t timeoutMs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
if (pTransInst == NULL) {
|
if (pTransInst == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
|
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
|
||||||
|
@ -3096,12 +3177,11 @@ _RETURN2:
|
||||||
/*
|
/*
|
||||||
*
|
*
|
||||||
**/
|
**/
|
||||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
if (pTransInst == NULL) {
|
if (pTransInst == NULL) {
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCvtAddr cvtAddr = {0};
|
SCvtAddr cvtAddr = {0};
|
||||||
if (ip != NULL && fqdn != NULL) {
|
if (ip != NULL && fqdn != NULL) {
|
||||||
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
|
tstrncpy(cvtAddr.ip, ip, sizeof(cvtAddr.ip));
|
||||||
|
@ -3145,7 +3225,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t transAllocHandle() {
|
int32_t transAllocHandle(int64_t* refId) {
|
||||||
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -3166,5 +3246,39 @@ int64_t transAllocHandle() {
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
taosInitRWLatch(&exh->latch);
|
taosInitRWLatch(&exh->latch);
|
||||||
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
||||||
return exh->refId;
|
*refId = exh->refId;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int32_t transFreeConnById(void* shandle, int64_t transpointId) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
if (pTransInst == NULL) {
|
||||||
|
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||||
|
}
|
||||||
|
if (transpointId == 0) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCliThrd* pThrd = transGetWorkThrdFromHandle(pTransInst, transpointId);
|
||||||
|
if (pThrd == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCliMsg* pCli = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
|
if (pCli == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _except);
|
||||||
|
}
|
||||||
|
pCli->type = FreeById;
|
||||||
|
|
||||||
|
STransMsg msg = {.info.handle = (void*)transpointId};
|
||||||
|
|
||||||
|
code = transAsyncSend(pThrd->asyncPool, &pCli->q);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(pCli);
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _except);
|
||||||
|
}
|
||||||
|
|
||||||
|
_except:
|
||||||
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
return (p->left == 0 || p->invalid) ? true : false;
|
return (p->left == 0 || p->invalid) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream, int keepalive) {
|
int32_t transSetConnOption(uv_tcp_t* stream, int keepalive) {
|
||||||
#if defined(WINDOWS) || defined(DARWIN)
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
#else
|
#else
|
||||||
return uv_tcp_keepalive(stream, 1, keepalive);
|
return uv_tcp_keepalive(stream, 1, keepalive);
|
||||||
|
@ -745,8 +745,7 @@ int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
|
||||||
return taosRemoveRef(refMgt, refId);
|
return taosRemoveRef(refMgt, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
|
void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle
|
||||||
// acquire extern handle
|
|
||||||
return (void*)taosAcquireRef(refMgt, refId);
|
return (void*)taosAcquireRef(refMgt, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1707,7 +1707,7 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int transReleaseSrvHandle(void* handle) {
|
int32_t transReleaseSrvHandle(void* handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRpcHandleInfo* info = handle;
|
SRpcHandleInfo* info = handle;
|
||||||
SExHandle* exh = info->handle;
|
SExHandle* exh = info->handle;
|
||||||
|
@ -1747,7 +1747,7 @@ _return2:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendResponse(const STransMsg* msg) {
|
int32_t transSendResponse(const STransMsg* msg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (msg->info.noResp) {
|
if (msg->info.noResp) {
|
||||||
|
@ -1800,7 +1800,7 @@ _return2:
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int transRegisterMsg(const STransMsg* msg) {
|
int32_t transRegisterMsg(const STransMsg* msg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SExHandle* exh = msg->info.handle;
|
SExHandle* exh = msg->info.handle;
|
||||||
|
@ -1891,4 +1891,4 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
|
||||||
|
|
Loading…
Reference in New Issue