add libuv

This commit is contained in:
yihaoDeng 2022-01-15 18:20:53 +08:00
parent 41f600698c
commit fdb79077c4
1 changed files with 98 additions and 110 deletions

View File

@ -66,10 +66,31 @@ typedef struct {
struct SRpcConn* connList; // connection list struct SRpcConn* connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SEpSet* pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
#ifdef USE_UV #ifdef USE_UV
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
static const char* notify = "a"; static const char* notify = "a";
typedef struct SThreadObj { typedef struct SThreadObj {
@ -94,12 +115,12 @@ typedef struct SServerObj {
uint32_t port; uint32_t port;
} SServerObj; } SServerObj;
typedef struct SContent { typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;
int cap; int cap;
int toRead; int left;
} SContent; } SConnBuffer;
typedef struct SConnCtx { typedef struct SConnCtx {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
@ -110,18 +131,18 @@ typedef struct SConnCtx {
queue queue; queue queue;
int ref; int ref;
int persist; // persist connection or not int persist; // persist connection or not
SContent pCont; SConnBuffer connBuf;
int count; int count;
} SConnCtx; } SConnCtx;
static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void onTimeout(uv_timer_t* handle); static void uvOnTimeoutCb(uv_timer_t* handle);
static void onWrite(uv_write_t* req, int status); static void uvOnWriteCb(uv_write_t* req, int status);
static void onAccept(uv_stream_t* stream, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void workerAsyncCB(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static SConnCtx* connCtxCreate(); static SConnCtx* connCtxCreate();
static void connCtxDestroy(SConnCtx* ctx); static void connCtxDestroy(SConnCtx* ctx);
@ -193,95 +214,68 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc; return pRpc;
} }
void rpcClose(void* arg) { return; }
void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
static const int CAPACITY = 1024; static const int CAPACITY = 1024;
tDebug("pre alloc buffer for read "); /*
SConnCtx* ctx = handle->data; * formate of data buffer:
SContent* pCont = &ctx->pCont; * |<-------SRpcReqContext------->|<------------data read from socket----------->|
if (pCont->cap == 0) { */
pCont->buf = (char*)calloc(CAPACITY, sizeof(char));
pCont->len = 0;
pCont->cap = CAPACITY;
pCont->toRead = -1;
buf->base = pCont->buf; SConnCtx* ctx = handle->data;
SConnBuffer* pBuf = &ctx->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
buf->len = CAPACITY; buf->len = CAPACITY;
} else { } else {
if (pCont->len >= pCont->cap) { if (pBuf->len >= pBuf->cap) {
if (pCont->toRead == -1) { if (pBuf->left == -1) {
pCont->cap *= 2; pBuf->cap *= 2;
pCont->buf = realloc(pCont->buf, pCont->cap); pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
} else if (pCont->len + pCont->toRead > pCont->cap) { } else if (pBuf->len + pBuf->left > pBuf->cap) {
pCont->cap = pCont->len + pCont->toRead; pBuf->cap = pBuf->len + pBuf->left;
pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead); pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
} }
} }
buf->base = pCont->buf + pCont->len; buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
buf->len = pCont->cap - pCont->len; buf->len = pBuf->cap - pBuf->len;
} }
// if (ctx->pCont.cap == 0) {
// ctx->pCont.buf = (char*)calloc(64, sizeof(char));
// ctx->pCont.len = 0;
// ctx->pCont.cap = 64;
// //
// buf->base = ctx->pCont.buf;
// buf->len = sz;
//} else {
// if (ctx->pCont.len + sz > ctx->pCont.cap) {
// ctx->pCont.cap *= 2;
// ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap);
// }
// buf->base = ctx->pCont.buf + ctx->pCont.len;
// buf->len = sz;
//}
} }
// change later // check data read from socket completely or not
static bool handleUserData(SContent* data) { //
static bool isReadAll(SConnBuffer* data) {
// TODO(yihao): handle pipeline later
SRpcHead rpcHead; SRpcHead rpcHead;
bool finish = false;
int32_t msgLen, leftLen, retLen;
int32_t headLen = sizeof(rpcHead); int32_t headLen = sizeof(rpcHead);
if (data->len >= headLen) { if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf, headLen); memcpy((char*)&rpcHead, data->buf, headLen);
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (msgLen + headLen <= data->len) { if (msgLen > data->len) {
return true; data->left = msgLen - data->len;
} else {
return false; return false;
} else {
return true;
} }
} else { } else {
return false; return false;
} }
} }
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
SConnCtx* ctx = cli->data; SConnCtx* ctx = cli->data;
SContent* pCont = &ctx->pCont; SConnBuffer* pBuf = &ctx->connBuf;
if (nread > 0) { if (nread > 0) {
pCont->len += nread; pBuf->len += nread;
bool finish = handleUserData(pCont); if (isReadAll(pBuf)) {
if (finish == false) { tDebug("alread read complete packet");
tDebug("continue read");
} else { } else {
tDebug("read completely"); tDebug("read half packet, continue to read");
} }
return; return;
} }
@ -291,17 +285,17 @@ void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
uv_close((uv_handle_t*)cli, uvConnCtxDestroy); uv_close((uv_handle_t*)cli, uvConnCtxDestroy);
} }
void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char)); buf->base = malloc(sizeof(char));
buf->len = 2; buf->len = 2;
} }
void onTimeout(uv_timer_t* handle) { void uvOnTimeoutCb(uv_timer_t* handle) {
// opt // opt
tDebug("time out"); tDebug("time out");
} }
void onWrite(uv_write_t* req, int status) { void uvOnWriteCb(uv_write_t* req, int status) {
SConnCtx* ctx = req->data; SConnCtx* ctx = req->data;
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
@ -311,7 +305,7 @@ void onWrite(uv_write_t* req, int status) {
// opt // opt
} }
void workerAsyncCB(uv_async_t* handle) { void uvWorkerAsyncCb(uv_async_t* handle) {
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SConnCtx* conn = NULL; SConnCtx* conn = NULL;
@ -329,7 +323,7 @@ void workerAsyncCB(uv_async_t* handle) {
} }
} }
void onAccept(uv_stream_t* stream, int status) { void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) { if (status == -1) {
return; return;
} }
@ -345,12 +339,12 @@ void onAccept(uv_stream_t* stream, int status) {
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
} else { } else {
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
} }
} }
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("connection coming"); tDebug("connection coming");
if (nread < 0) { if (nread < 0) {
if (nread != UV_EOF) { if (nread != UV_EOF) {
@ -396,7 +390,7 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd); tDebug("new connection created: %d", fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else { } else {
connCtxDestroy(pConn); connCtxDestroy(pConn);
} }
@ -412,7 +406,7 @@ void* acceptThread(void* arg) {
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0; int err = 0;
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
tError("Listen error %s\n", uv_err_name(err)); tError("Listen error %s\n", uv_err_name(err));
return NULL; return NULL;
} }
@ -430,9 +424,9 @@ void* workerThread(void* arg) {
QUEUE_INIT(&pObj->conn); QUEUE_INIT(&pObj->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t)); pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection); uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pObj->loop, UV_RUN_DEFAULT); uv_run(pObj->loop, UV_RUN_DEFAULT);
} }
static SConnCtx* connCtxCreate() { static SConnCtx* connCtxCreate() {
@ -455,6 +449,20 @@ static void uvConnCtxDestroy(uv_handle_t* handle) {
SConnCtx* ctx = handle->data; SConnCtx* ctx = handle->data;
connCtxDestroy(ctx); connCtxDestroy(ctx);
} }
void rpcClose(void* arg) { return; }
void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
#else #else
@ -465,26 +473,6 @@ static void uvConnCtxDestroy(uv_handle_t* handle) {
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U) #define rpcIsReq(type) (type & 1U)
typedef struct {
SRpcInfo * pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void * ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t * pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg * pRsp; // for synchronous API
tsem_t * pSem; // for synchronous API
SEpSet * pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[48]; // debug info: label + pConn + ahandle char info[48]; // debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID