Merge pull request #9938 from taosdata/feature/rpc

refactor rpc
This commit is contained in:
Yihao Deng 2022-01-20 21:03:28 +08:00 committed by GitHub
commit 80a07c38da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 27 deletions

View File

@ -199,4 +199,13 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx);
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
#endif #endif

View File

@ -17,13 +17,6 @@
#include "transComm.h" #include "transComm.h"
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
taosInitServer, taosInitClient}; taosInitServer, taosInitClient};
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
@ -46,10 +39,11 @@ void* rpcOpen(const SRpcInit* pInit) {
void rpcClose(void* arg) { void rpcClose(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg; SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
free(pRpc);
return; return;
} }
void* rpcMallocCont(int contLen) { void* rpcMallocCont(int contLen) {
int size = contLen + RPC_MSG_OVERHEAD; int size = contLen + TRANS_MSG_OVERHEAD;
char* start = (char*)calloc(1, (size_t)size); char* start = (char*)calloc(1, (size_t)size);
if (start == NULL) { if (start == NULL) {

View File

@ -21,6 +21,7 @@ typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
uv_write_t* writeReq; uv_write_t* writeReq;
SConnBuffer readBuf;
void* data; void* data;
queue conn; queue conn;
char spi; char spi;
@ -55,28 +56,92 @@ typedef struct SClientObj {
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
// process data read from server, auth/decompress etc
static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket
static void clientWriteCb(uv_write_t* req, int status); static void clientWriteCb(uv_write_t* req, int status);
// callback after conn to server
static void clientConnCb(uv_connect_t* req, int status); static void clientConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle); static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle); static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn); static void clientConnDestroy(SCliConn* pConn);
static void clientMsgDestroy(SCliMsg* pMsg);
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) {
// impl
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static bool clientReadComplete(SConnBuffer* data) {
STransMsgHead head;
int32_t headLen = sizeof(head);
if (data->len >= headLen) {
memcpy((char*)&head, data->buf, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
return true;
}
} else {
return false;
}
}
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
// impl later // impl later
static const int CAPACITY = 512;
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
tDebug("alread read complete pack");
clientProcessData(conn);
} else {
tDebug("read halp packet, continue to read");
}
return; return;
} }
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
}
// //
uv_close((uv_handle_t*)handle, clientDestroy); uv_close((uv_handle_t*)handle, clientDestroy);
} }
@ -164,7 +229,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->writeReq->data = conn; conn->writeReq->data = conn;
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = malloc(sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
@ -235,9 +300,23 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} }
return cli; return cli;
} }
static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later
free(pMsg);
}
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
SClientObj* cli = arg; SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = cli->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync);
free(pThrd->loop);
free(pThrd);
}
free(cli->pThreadObj);
free(cli);
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
@ -247,19 +326,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pRpc = (SRpcInfo*)shandle;
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pRpc = (SRpcInfo*)shandle; pCtx->pRpc = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
// pContext->contLen = len;
// pContext->pCont = pMsg->pCont;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
pCtx->port = port; pCtx->port = port;
// pContext->epSet = *pEpSet;
// pContext->oldInUse = pEpSet->inUse;
assert(pRpc->connType == TAOS_CONN_CLIENT); assert(pRpc->connType == TAOS_CONN_CLIENT);
// atomic or not // atomic or not

View File

@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
} }
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont); // SRpcHead* pHead = rpcHeadFromCont(pCont);
bool succ = false; bool succ = false;
int overhead = sizeof(STransCompMsg); int overhead = sizeof(STransCompMsg);
@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
return pHead; return pHead;
} }
void transConnCtxDestroy(STransConnCtx* ctx) {
free(ctx->ip);
free(ctx);
}
#endif #endif

View File

@ -16,13 +16,6 @@
#ifdef USE_UV #ifdef USE_UV
#include "transComm.h" #include "transComm.h"
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
typedef struct SConn { typedef struct SConn {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter; uv_write_t* pWriter;
@ -100,7 +93,8 @@ static void* acceptThread(void* arg);
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
/* /*
* formate of data buffer: * formate of data buffer:
* |<-------SRpcReqContext------->|<------------data read from socket----------->| * |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/ */
static const int CAPACITY = 1024; static const int CAPACITY = 1024;
@ -133,7 +127,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
// //
static bool readComplete(SConnBuffer* data) { static bool readComplete(SConnBuffer* data) {
// TODO(yihao): handle pipeline later // TODO(yihao): handle pipeline later
// SRpcHead rpcHead;
STransMsgHead head; STransMsgHead head;
int32_t headLen = sizeof(head); int32_t headLen = sizeof(head);
if (data->len >= headLen) { if (data->len >= headLen) {
@ -270,13 +263,13 @@ static void uvProcessData(SConn* pConn) {
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
SConn* ctx = cli->data; SConn* conn = cli->data;
SConnBuffer* pBuf = &ctx->connBuf; SConnBuffer* pBuf = &conn->connBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (readComplete(pBuf)) { if (readComplete(pBuf)) {
tDebug("alread read complete packet"); tDebug("alread read complete packet");
uvProcessData(ctx); uvProcessData(conn);
} else { } else {
tDebug("read half packet, continue to read"); tDebug("read half packet, continue to read");
} }
@ -496,6 +489,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
@ -530,6 +524,18 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* pThrd = srv->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
free(srv->pipe[i]);
free(pThrd->loop);
free(pThrd);
}
free(srv->loop);
free(srv->pipe);
free(srv->pThreadObj);
pthread_join(srv->thread, NULL);
free(srv);
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void rpcSendResponse(const SRpcMsg* pMsg) {