Merge pull request #9974 from taosdata/origin/feature/rpc-refactor
refactor code and test reading half packet
This commit is contained in:
commit
890b2df647
|
@ -32,6 +32,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->cfp = pInit->cfp;
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
pRpc->connType = pInit->connType;
|
||||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
return pRpc;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
#include "transComm.h"
|
||||
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 100)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
|
||||
typedef struct SCliConn {
|
||||
uv_connect_t connReq;
|
||||
|
@ -65,15 +65,15 @@ typedef struct SConnList {
|
|||
|
||||
// conn pool
|
||||
// add expire timeout and capacity limit
|
||||
static void* connPoolCreate(int size);
|
||||
static void* connPoolDestroy(void* pool);
|
||||
static void* creatConnPool(int size);
|
||||
static void* destroyConnPool(void* pool);
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
||||
|
||||
// register timer in each thread to clear expire conn
|
||||
static void clientTimeoutCb(uv_timer_t* handle);
|
||||
// process data read from server, auth/decompress etc later
|
||||
static void clientProcessData(SCliConn* conn);
|
||||
static void clientHandleResp(SCliConn* conn);
|
||||
// check whether already read complete packet from server
|
||||
static bool clientReadComplete(SConnBuffer* pBuf);
|
||||
// alloc buf for read
|
||||
|
@ -86,9 +86,11 @@ static void clientWriteCb(uv_write_t* req, int status);
|
|||
static void clientConnCb(uv_connect_t* req, int status);
|
||||
static void clientAsyncCb(uv_async_t* handle);
|
||||
static void clientDestroy(uv_handle_t* handle);
|
||||
static void clientConnDestroy(SCliConn* pConn);
|
||||
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||
|
||||
static void clientMsgDestroy(SCliMsg* pMsg);
|
||||
// handle req from app
|
||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
|
||||
// thread obj
|
||||
static SCliThrdObj* createThrdObj();
|
||||
|
@ -96,9 +98,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
// thread
|
||||
static void* clientThread(void* arg);
|
||||
|
||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
|
||||
static void clientProcessData(SCliConn* conn) {
|
||||
static void clientHandleResp(SCliConn* conn) {
|
||||
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
||||
SRpcInfo* pRpc = pCtx->pRpc;
|
||||
SRpcMsg rpcMsg;
|
||||
|
@ -131,7 +131,9 @@ static void clientTimeoutCb(uv_timer_t* handle) {
|
|||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||
if (c->expireTime < currentTime) {
|
||||
QUEUE_REMOVE(h);
|
||||
clientConnDestroy(c);
|
||||
// uv_stream_t stm = *(c->stream);
|
||||
// uv_close((uv_handle_t*)&stm, clientDestroy);
|
||||
clientConnDestroy(c, true);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -142,18 +144,18 @@ static void clientTimeoutCb(uv_timer_t* handle) {
|
|||
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||
}
|
||||
static void* connPoolCreate(int size) {
|
||||
SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
return pool;
|
||||
static void* creatConnPool(int size) {
|
||||
// thread local, no lock
|
||||
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
}
|
||||
static void* connPoolDestroy(void* pool) {
|
||||
static void* destroyConnPool(void* pool) {
|
||||
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
||||
while (connList != NULL) {
|
||||
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
||||
queue* h = QUEUE_HEAD(&connList->conn);
|
||||
QUEUE_REMOVE(h);
|
||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||
clientConnDestroy(c);
|
||||
clientConnDestroy(c, true);
|
||||
}
|
||||
connList = taosHashIterate((SHashObj*)pool, connList);
|
||||
}
|
||||
|
@ -245,28 +247,37 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
pBuf->len += nread;
|
||||
if (clientReadComplete(pBuf)) {
|
||||
tDebug("alread read complete");
|
||||
clientProcessData(conn);
|
||||
clientHandleResp(conn);
|
||||
} else {
|
||||
tDebug("read halp packet, continue to read");
|
||||
tDebug("read half packet, continue to read");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread != UV_EOF) {
|
||||
tDebug("Read error %s\n", uv_err_name(nread));
|
||||
assert(nread <= 0);
|
||||
if (nread == 0) {
|
||||
return;
|
||||
}
|
||||
//
|
||||
uv_close((uv_handle_t*)handle, clientDestroy);
|
||||
if (nread != UV_EOF) {
|
||||
tDebug("read error %s", uv_err_name(nread));
|
||||
}
|
||||
// tDebug("Read error %s\n", uv_err_name(nread));
|
||||
// uv_close((uv_handle_t*)handle, clientDestroy);
|
||||
}
|
||||
|
||||
static void clientConnDestroy(SCliConn* conn) {
|
||||
// impl later
|
||||
//
|
||||
static void clientConnDestroy(SCliConn* conn, bool clear) {
|
||||
tDebug("conn %p destroy", conn);
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->stream, NULL);
|
||||
}
|
||||
free(conn->stream);
|
||||
free(conn->readBuf.buf);
|
||||
free(conn->writeReq);
|
||||
free(conn);
|
||||
}
|
||||
static void clientDestroy(uv_handle_t* handle) {
|
||||
SCliConn* conn = handle->data;
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
clientConnDestroy(conn);
|
||||
// QUEUE_REMOVE(&conn->conn);
|
||||
clientConnDestroy(conn, false);
|
||||
}
|
||||
|
||||
static void clientWriteCb(uv_write_t* req, int status) {
|
||||
|
@ -274,7 +285,8 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
|||
if (status == 0) {
|
||||
tDebug("data already was written on stream");
|
||||
} else {
|
||||
uv_close((uv_handle_t*)pConn->stream, clientDestroy);
|
||||
tError("failed to write: %s", uv_err_name(status));
|
||||
clientConnDestroy(pConn, true);
|
||||
return;
|
||||
}
|
||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||
|
@ -317,7 +329,9 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
|||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
uv_close((uv_handle_t*)req->handle, clientDestroy);
|
||||
|
||||
clientConnDestroy(pConn, true);
|
||||
// uv_close((uv_handle_t*)req->handle, clientDestroy);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -421,7 +435,6 @@ static void clientMsgDestroy(SCliMsg* pMsg) {
|
|||
}
|
||||
static SCliThrdObj* createThrdObj() {
|
||||
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
||||
|
||||
QUEUE_INIT(&pThrd->msg);
|
||||
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
||||
|
||||
|
@ -436,7 +449,7 @@ static SCliThrdObj* createThrdObj() {
|
|||
uv_timer_init(pThrd->loop, pThrd->pTimer);
|
||||
pThrd->pTimer->data = pThrd;
|
||||
|
||||
pThrd->pool = connPoolCreate(1);
|
||||
pThrd->pool = creatConnPool(1);
|
||||
return pThrd;
|
||||
}
|
||||
static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
*/
|
||||
|
||||
#ifdef USE_UV
|
||||
#include "transComm.h"
|
||||
|
||||
#include "transComm.h"
|
||||
typedef struct SConn {
|
||||
uv_tcp_t* pTcp;
|
||||
uv_write_t* pWriter;
|
||||
|
@ -84,12 +84,12 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
|
|||
|
||||
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
|
||||
|
||||
// already read complete packet
|
||||
static bool readComplete(SConnBuffer* buf);
|
||||
// check whether already read complete packet
|
||||
static bool readComplete(SConnBuffer* buf);
|
||||
static SConn* createConn();
|
||||
static void destroyConn(SConn* conn, bool clear /*clear handle or not*/);
|
||||
|
||||
static SConn* connCreate();
|
||||
static void connDestroy(SConn* conn);
|
||||
static void uvConnDestroy(uv_handle_t* handle);
|
||||
static void uvDestroyConn(uv_handle_t* handle);
|
||||
|
||||
// server and worker thread
|
||||
static void* workerThread(void* arg);
|
||||
|
@ -283,6 +283,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
SConnBuffer* pBuf = &conn->connBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread);
|
||||
if (readComplete(pBuf)) {
|
||||
tDebug("alread read complete packet");
|
||||
uvProcessData(conn);
|
||||
|
@ -291,11 +292,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
}
|
||||
return;
|
||||
}
|
||||
if (nread != UV_EOF) {
|
||||
tDebug("Read error %s", uv_err_name(nread));
|
||||
if (nread == 0) {
|
||||
return;
|
||||
}
|
||||
if (nread != UV_EOF) {
|
||||
tDebug("read error %s", uv_err_name(nread));
|
||||
}
|
||||
tDebug("read error %s", uv_err_name(nread));
|
||||
uv_close((uv_handle_t*)cli, uvConnDestroy);
|
||||
}
|
||||
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
buf->base = malloc(sizeof(char));
|
||||
|
@ -318,7 +320,7 @@ void uvOnWriteCb(uv_write_t* req, int status) {
|
|||
tDebug("data already was written on stream");
|
||||
} else {
|
||||
tDebug("failed to write data, %s", uv_err_name(status));
|
||||
connDestroy(conn);
|
||||
destroyConn(conn, true);
|
||||
}
|
||||
// opt
|
||||
}
|
||||
|
@ -331,7 +333,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
|||
}
|
||||
|
||||
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
|
||||
// impl later
|
||||
// impl later;
|
||||
tDebug("prepare to send back");
|
||||
SRpcMsg* pMsg = &conn->sendMsg;
|
||||
if (pMsg->pCont == 0) {
|
||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||
|
@ -394,6 +397,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
|||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
||||
} else {
|
||||
uv_close((uv_handle_t*)cli, NULL);
|
||||
free(cli);
|
||||
}
|
||||
}
|
||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||
|
@ -403,7 +407,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
tError("read error %s", uv_err_name(nread));
|
||||
}
|
||||
// TODO(log other failure reason)
|
||||
uv_close((uv_handle_t*)q, NULL);
|
||||
// uv_close((uv_handle_t*)q, NULL);
|
||||
return;
|
||||
}
|
||||
// free memory allocated by
|
||||
|
@ -422,7 +426,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
||||
assert(pending == UV_TCP);
|
||||
|
||||
SConn* pConn = connCreate();
|
||||
SConn* pConn = createConn();
|
||||
pConn->pTransInst = pThrd->pTransInst;
|
||||
/* init conn timer*/
|
||||
pConn->pTimer = malloc(sizeof(uv_timer_t));
|
||||
|
@ -448,7 +452,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
||||
} else {
|
||||
tDebug("failed to create new connection");
|
||||
connDestroy(pConn);
|
||||
destroyConn(pConn, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,7 +513,7 @@ void* workerThread(void* arg) {
|
|||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
static SConn* connCreate() {
|
||||
static SConn* createConn() {
|
||||
SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
|
||||
return pConn;
|
||||
}
|
||||
|
@ -517,22 +521,24 @@ static void connCloseCb(uv_handle_t* handle) {
|
|||
// impl later
|
||||
//
|
||||
}
|
||||
static void connDestroy(SConn* conn) {
|
||||
static void destroyConn(SConn* conn, bool clear) {
|
||||
if (conn == NULL) {
|
||||
return;
|
||||
}
|
||||
if (clear) {
|
||||
uv_handle_t handle = *((uv_handle_t*)conn->pTcp);
|
||||
uv_close(&handle, NULL);
|
||||
}
|
||||
uv_timer_stop(conn->pTimer);
|
||||
free(conn->pTimer);
|
||||
// uv_close((uv_handle_t*)conn->pTcp, connCloseCb);
|
||||
free(conn->pTcp);
|
||||
free(conn->connBuf.buf);
|
||||
free(conn->pWriter);
|
||||
// free(conn);
|
||||
// handle
|
||||
free(conn);
|
||||
}
|
||||
static void uvConnDestroy(uv_handle_t* handle) {
|
||||
static void uvDestroyConn(uv_handle_t* handle) {
|
||||
SConn* conn = handle->data;
|
||||
connDestroy(conn);
|
||||
destroyConn(conn, false);
|
||||
}
|
||||
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
|
||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||
|
|
|
@ -34,8 +34,8 @@ typedef struct {
|
|||
|
||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||
// tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
// pMsg->code);
|
||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
pMsg->code);
|
||||
|
||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
|
||||
|
@ -63,6 +63,8 @@ static void *sendRequest(void *param) {
|
|||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
// tsem_wait(&pInfo->rspSem);
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
tDebug("recv response");
|
||||
// usleep(100000000);
|
||||
}
|
||||
|
||||
tDebug("thread:%d, it is over", pInfo->index);
|
||||
|
@ -98,7 +100,7 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processResponse;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.idleTime = 100;
|
||||
rpcInit.user = "michael";
|
||||
rpcInit.secret = secret;
|
||||
rpcInit.ckey = "key";
|
||||
|
|
|
@ -122,7 +122,7 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.sessions = 1000;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1500;
|
||||
rpcInit.idleTime = 2 * 1500;
|
||||
rpcInit.afp = retrieveAuthInfo;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
|
|
Loading…
Reference in New Issue