refactor transport

This commit is contained in:
yihaoDeng 2024-08-23 11:50:54 +08:00
parent cc0a3e3683
commit a7d76aaaf2
5 changed files with 287 additions and 302 deletions

View File

@ -142,7 +142,7 @@ typedef struct {
tmsg_t msgType; // message type
int8_t connType; // connection type cli/srv
STransCtx appCtx; //
STransCtx userCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
@ -318,24 +318,24 @@ void transUnrefCliHandle(void* handle);
int32_t transReleaseCliHandle(void* handle);
int32_t transReleaseSrvHandle(void* handle);
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx);
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp);
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx);
int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp);
int32_t transSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs);
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
int32_t transFreeConnById(void* shandle, int64_t transpointId);
int32_t transSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
int32_t transFreeConnById(void* pInit, int64_t transpointId);
int32_t transSendResponse(const STransMsg* msg);
int32_t transRegisterMsg(const STransMsg* msg);
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int32_t transSetDefaultAddr(void* pInit, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* pInit, void* arg, FilteFunc* func);
int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
int32_t transAllocHandle(int64_t* refId);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void transCloseClient(void* arg);
void transCloseServer(void* arg);

View File

@ -32,8 +32,8 @@
extern "C" {
#endif
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void taosCloseServer(void* arg);
void taosCloseClient(void* arg);

View File

@ -15,7 +15,7 @@
#include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* pInit) = {
transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
@ -103,7 +103,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->timeToGetConn = 10 * 1000;
}
pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
@ -167,29 +167,29 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
return st + TRANS_MSG_OVERHEAD;
}
int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL);
int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(pInit, pEpSet, pMsg, NULL);
}
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
return transSendRequest(pInit, pEpSet, pMsg, pCtx);
} else {
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
return transSendRequestWithId(pInit, pEpSet, pMsg, pRid);
}
}
int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
int32_t rpcSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
return transSendRequestWithId(pInit, pEpSet, pReq, transpointId);
}
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
int32_t rpcSendRecv(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(pInit, pEpSet, pMsg, pRsp);
}
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t rpcSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
return transSendRecvWithTimeout(pInit, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
}
int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); }
int32_t rpcFreeConnById(void* pInit, int64_t connId) { return transFreeConnById(pInit, connId); }
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }

File diff suppressed because it is too large Load Diff

View File

@ -1399,7 +1399,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
srv->numOfWorkerReady++;
}
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit) {
int32_t code = 0;
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
@ -1463,9 +1463,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pInst = shandle;
thrd->pInst = pInit;
thrd->quit = false;
thrd->pInst = shandle;
thrd->pInst = pInit;
thrd->pWhiteList = uvWhiteListCreate();
srv->pThreadObj[i] = thrd;
@ -1494,9 +1494,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
thrd->pInst = shandle;
thrd->pInst = pInit;
thrd->quit = false;
thrd->pInst = shandle;
thrd->pInst = pInit;
thrd->pWhiteList = uvWhiteListCreate();
if (thrd->pWhiteList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;