Merge pull request #10718 from taosdata/feature/supportQuery

update transport
This commit is contained in:
Yihao Deng 2022-03-14 16:21:15 +08:00 committed by GitHub
commit 4189328ddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 102 deletions

View File

@ -71,6 +71,10 @@ typedef struct SRpcInit {
// call back to keep conn or not // call back to keep conn or not
bool (*pfp)(void *parent, tmsg_t msgType); bool (*pfp)(void *parent, tmsg_t msgType);
// to support Send messages multiple times on a link
//
void* (*mfp)(void *parent, tmsg_t msgType);
void *parent; void *parent;
} SRpcInit; } SRpcInit;
@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
// just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle);
void rpcRefHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type);

View File

@ -120,6 +120,10 @@ typedef struct {
// SEpSet* pSet; // for synchronous API // SEpSet* pSet; // for synchronous API
} SRpcReqContext; } SRpcReqContext;
typedef SRpcMsg STransMsg;
typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo;
typedef struct { typedef struct {
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
@ -134,7 +138,7 @@ typedef struct {
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
int hThrdIdx; int hThrdIdx;
@ -249,4 +253,15 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle); void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle); void transUnrefCliHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* pMsg);
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void transCloseClient(void* arg);
void transCloseServer(void* arg);
#endif #endif

View File

@ -64,6 +64,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType); bool (*pfp)(void* parent, tmsg_t msgType);
void* (*mfp)(void* parent, tmsg_t msgType);
int32_t refCount; int32_t refCount;
void* parent; void* parent;

View File

@ -18,8 +18,9 @@
#include "transComm.h" #include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
taosInitServer, taosInitClient}; transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void* rpcOpen(const SRpcInit* pInit) { void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
@ -34,11 +35,12 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->pfp = pInit->pfp; pRpc->pfp = pInit->pfp;
pRpc->mfp = pInit->mfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} else { } else {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} }
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
@ -116,6 +118,24 @@ int32_t rpcInit() {
return 0; return 0;
} }
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg);
}
void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRecv(shandle, ip, port, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg *pMsg) {
transSendResponse(pMsg);
}
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return transGetConnInfo((void *)thandle, pInfo);
}
void rpcCleanup(void) { void rpcCleanup(void) {
// impl later // impl later
// //
@ -129,6 +149,7 @@ void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle); (*taosRefHandle[type])(handle);
} }
void rpcUnrefHandle(void* handle, int8_t type) { void rpcUnrefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);

View File

@ -42,7 +42,7 @@ typedef struct SCliConn {
typedef struct SCliMsg { typedef struct SCliMsg {
STransConnCtx* ctx; STransConnCtx* ctx;
SRpcMsg msg; STransMsg msg;
queue q; queue q;
uint64_t st; uint64_t st;
} SCliMsg; } SCliMsg;
@ -105,9 +105,9 @@ static void cliHandleExcept(SCliConn* conn);
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(STransMsg* userdata);
static int cliRBChoseIdx(SRpcInfo* pTransInst); static int cliRBChoseIdx(STrans* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
@ -118,7 +118,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
@ -135,15 +135,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \ } \
} while (0); } while (0);
#define CONN_SET_PERSIST_BY_APP(conn) \
do { \
if (conn->persist == false) { \
conn->persist = true; \
transRefCliHandle(conn); \
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
static void* cliNotifyApp() {} void cliHandleResp(SCliConn* conn) {
static void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx;
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
@ -152,19 +157,29 @@ static void cliHandleResp(SCliConn* conn) {
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
SRpcMsg rpcMsg = {0}; STransMsg rpcMsg = {0};
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.pCont = transContFromHead((char*)pHead);
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = NULL;
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
// if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
transRefCliHandle(conn); CONN_SET_PERSIST_BY_APP(conn);
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
conn->persist = 1;
tDebug("cli conn %p persist by app", conn);
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
@ -173,7 +188,7 @@ static void cliHandleResp(SCliConn* conn) {
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
@ -184,8 +199,7 @@ static void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
// user owns conn->persist = 1 if (CONN_NO_PERSIST_BY_APP(conn)) {
if (conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
} }
destroyCmsg(conn->data); destroyCmsg(conn->data);
@ -196,24 +210,32 @@ static void cliHandleResp(SCliConn* conn) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void cliHandleExcept(SCliConn* pConn) {
void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL) {
// handle conn except in conn pool if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return; return;
} }
}
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
SRpcMsg rpcMsg = {0}; STransMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = pMsg->msg.msgType + 1; rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
rpcMsg.ahandle = NULL;
if (pCtx->pSem == NULL) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL;
} else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
}
if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
@ -228,9 +250,9 @@ static void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
static void cliTimeoutCb(uv_timer_t* handle) { void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; STrans* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label);
@ -252,11 +274,12 @@ static void cliTimeoutCb(uv_timer_t* handle) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* createConnPool(int size) {
void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
} }
static void* destroyConnPool(void* pool) { void* destroyConnPool(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
@ -301,7 +324,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
@ -358,6 +381,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->persist = false;
conn->broken = false; conn->broken = false;
transRefCliHandle(conn); transRefCliHandle(conn);
return conn; return conn;
@ -395,16 +419,16 @@ static void cliSendCb(uv_write_t* req, int status) {
uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb);
} }
static void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn); CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen); int msgLen = transMsgLenFromCont(pMsg->contLen);
@ -442,7 +466,8 @@ static void cliSend(SCliConn* pConn) {
_RETURE: _RETURE:
return; return;
} }
static void cliConnCb(uv_connect_t* req, int status) {
void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
@ -472,11 +497,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) { if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
@ -487,13 +512,14 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} }
return conn; return conn;
} }
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliConn* conn = cliGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
@ -514,6 +540,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
} }
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
static void cliAsyncCb(uv_async_t* handle) { static void cliAsyncCb(uv_async_t* handle) {
@ -551,10 +578,10 @@ static void* cliWorkThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SCliObj* cli = calloc(1, sizeof(SCliObj)); SCliObj* cli = calloc(1, sizeof(SCliObj));
SRpcInfo* pRpc = shandle; STrans* pRpc = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads; cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
@ -573,7 +600,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return cli; return cli;
} }
static void destroyUserdata(SRpcMsg* userdata) { static void destroyUserdata(STransMsg* userdata) {
if (userdata->pCont == NULL) { if (userdata->pCont == NULL) {
return; return;
} }
@ -629,12 +656,20 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx); free(ctx);
} }
// //
static void cliSendQuit(SCliThrdObj* thrd) { void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
void taosCloseClient(void* arg) { int cliRBChoseIdx(STrans* pTransInst) {
int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0;
}
return index % pTransInst->numOfThreads;
}
void transCloseClient(void* arg) {
SCliObj* cli = arg; SCliObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
cliSendQuit(cli->pThreadObj[i]); cliSendQuit(cli->pThreadObj[i]);
@ -643,13 +678,6 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
static int cliRBChoseIdx(SRpcInfo* pTransInst) {
int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0;
}
return index % pTransInst->numOfThreads;
}
void transRefCliHandle(void* handle) { void transRefCliHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
@ -665,17 +693,11 @@ void transUnrefCliHandle(void* handle) {
if (ref == 0) { if (ref == 0) {
cliDestroyConn((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
// unref cli handle
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pTransInst = (SRpcInfo*)shandle; void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
STrans* pTransInst = (STrans*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg->handle); int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
@ -683,6 +705,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later // imp later
} }
tDebug("send request at thread:%d %p", index, pMsg);
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
@ -701,13 +724,8 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { STrans* pTransInst = (STrans*)shandle;
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pTransInst = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
@ -734,7 +752,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
tsem_wait(pSem); tsem_wait(pSem);
tsem_destroy(pSem); tsem_destroy(pSem);
free(pSem); free(pSem);
return;
} }
#endif #endif

View File

@ -37,8 +37,7 @@ typedef struct SSrvConn {
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
// SRpcMsg sendMsg;
// del later
char secured; char secured;
int spi; int spi;
char info[64]; char info[64];
@ -49,7 +48,7 @@ typedef struct SSrvConn {
typedef struct SSrvMsg { typedef struct SSrvMsg {
SSrvConn* pConn; SSrvConn* pConn;
SRpcMsg msg; STransMsg msg;
queue q; queue q;
} SSrvMsg; } SSrvMsg;
@ -207,20 +206,20 @@ static void uvHandleReq(SSrvConn* pConn) {
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; STrans* pRpc = (STrans*)p->shandle;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
if (transDecompressMsg(NULL, 0, NULL)) { if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later // add compress later
// pHead = rpcDecompressRpcMsg(pHead); // pHead = rpcDecompresSTransMsg(pHead);
} else { } else {
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
// impl later // impl later
// //
} }
SRpcMsg rpcMsg; STransMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
@ -260,7 +259,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
tError("server conn %p read error: %s", conn, uv_err_name(nread)); tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0 || nread == UV_EOF) { if (nread < 0) {
conn->broken = true; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
@ -319,7 +318,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
tTrace("server conn %p prepare to send resp", smsg->pConn); tTrace("server conn %p prepare to send resp", smsg->pConn);
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
SRpcMsg* pMsg = &smsg->msg; STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
@ -547,7 +546,7 @@ static bool addHandleToWorkloop(void* arg) {
return false; return false;
} }
// SRpcInfo* pRpc = pThrd->shandle; // STrans* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd); uv_pipe_open(pThrd->pipe, pThrd->fd);
@ -668,7 +667,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
return msgLen; return msgLen;
} }
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj)); SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThreads = numOfThreads; srv->numOfThreads = numOfThreads;
@ -720,7 +719,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return srv; return srv;
End: End:
taosCloseServer(srv); transCloseServer(srv);
return NULL; return NULL;
} }
@ -740,7 +739,7 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
void taosCloseServer(void* arg) { void transCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
@ -786,7 +785,7 @@ void transUnrefSrvHandle(void* handle) {
} }
// unref srv handle // unref srv handle
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void transSendResponse(const STransMsg* pMsg) {
if (pMsg->handle == NULL) { if (pMsg->handle == NULL) {
return; return;
} }
@ -799,14 +798,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
tTrace("server conn %p start to send resp", pConn); tTrace("server conn %p start to send resp", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
SSrvConn* pConn = thandle; SSrvConn* pConn = thandle;
struct sockaddr_in addr = pConn->addr; struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
pInfo->clientPort = ntohs(addr.sin_port); pInfo->clientPort = ntohs(addr.sin_port);
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
return 0; return 0;
} }