refactor rpc
This commit is contained in:
parent
b65bbcfd17
commit
cea06000bf
|
@ -95,8 +95,10 @@ static void clientHandleExcept(SCliConn* conn);
|
||||||
// handle req from app
|
// handle req from app
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
|
||||||
static void clientMsgDestroy(SCliMsg* pMsg);
|
static void destroyUserdata(SRpcMsg* userdata);
|
||||||
static void destroyTransConnCtx(STransConnCtx* ctx);
|
|
||||||
|
static void destroyCmsg(SCliMsg* cmsg);
|
||||||
|
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
// thread obj
|
// thread obj
|
||||||
static SCliThrdObj* createThrdObj();
|
static SCliThrdObj* createThrdObj();
|
||||||
static void destroyThrdObj(SCliThrdObj* pThrd);
|
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
|
@ -104,7 +106,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
static void* clientThread(void* arg);
|
static void* clientThread(void* arg);
|
||||||
|
|
||||||
static void clientHandleResp(SCliConn* conn) {
|
static void clientHandleResp(SCliConn* conn) {
|
||||||
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
SCliMsg* pMsg = conn->data;
|
||||||
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SRpcInfo* pRpc = pCtx->pTransInst;
|
SRpcInfo* pRpc = pCtx->pTransInst;
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||||
|
@ -122,41 +125,44 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
conn->notifyCount += 1;
|
conn->notifyCount += 1;
|
||||||
|
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
// buf's mem alread translated to rpcMsg.pCont
|
||||||
tfree(conn->data);
|
|
||||||
// buf alread translated to rpcMsg.pCont
|
|
||||||
transClearBuffer(&conn->readBuf);
|
transClearBuffer(&conn->readBuf);
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
||||||
|
|
||||||
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||||
|
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
conn->data = NULL;
|
||||||
// start thread's timer of conn pool if not active
|
// start thread's timer of conn pool if not active
|
||||||
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
|
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
|
||||||
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
}
|
}
|
||||||
destroyTransConnCtx(pCtx);
|
|
||||||
}
|
}
|
||||||
static void clientHandleExcept(SCliConn* pConn) {
|
static void clientHandleExcept(SCliConn* pConn) {
|
||||||
if (pConn->data == NULL) {
|
if (pConn->data == NULL) {
|
||||||
|
// handle conn except in conn pool
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tDebug("conn %p destroy", pConn);
|
tDebug("conn %p start to destroy", pConn);
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
transFreeMsg((pMsg->msg.pCont));
|
|
||||||
pMsg->msg.pCont = NULL;
|
destroyUserdata(&pMsg->msg);
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SRpcInfo* pRpc = pCtx->pTransInst;
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
rpcMsg.code = -1;
|
rpcMsg.code = -1;
|
||||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
||||||
tfree(pConn->data);
|
|
||||||
pConn->notifyCount += 1;
|
pConn->notifyCount += 1;
|
||||||
destroyTransConnCtx(pCtx);
|
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
pConn->data = NULL;
|
||||||
|
// transDestroyConnCtx(pCtx);
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,6 +242,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||||
|
|
||||||
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||||
|
|
||||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
conn->notifyCount = 0;
|
conn->notifyCount = 0;
|
||||||
|
@ -282,6 +289,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
}
|
}
|
||||||
assert(nread <= 0);
|
assert(nread <= 0);
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
|
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
|
||||||
|
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
|
||||||
|
// read(2).
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread < 0 || nread == UV_EOF) {
|
if (nread < 0 || nread == UV_EOF) {
|
||||||
|
@ -321,11 +331,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
tDebug("conn %p data already was written out", pConn);
|
tDebug("conn %p data already was written out", pConn);
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
if (pMsg != NULL) {
|
if (pMsg == NULL) {
|
||||||
transFreeMsg((pMsg->msg.pCont));
|
destroy
|
||||||
pMsg->msg.pCont = NULL;
|
// handle
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
destroyUserdata(&pMsg->msg);
|
||||||
} else {
|
} else {
|
||||||
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
|
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
|
||||||
clientHandleExcept(pConn);
|
clientHandleExcept(pConn);
|
||||||
|
@ -453,8 +464,20 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
}
|
}
|
||||||
return cli;
|
return cli;
|
||||||
}
|
}
|
||||||
static void clientMsgDestroy(SCliMsg* pMsg) {
|
|
||||||
// impl later
|
static void destroyUserdata(SRpcMsg* userdata) {
|
||||||
|
if (userdata->pCont == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
transFreeMsg(userdata->pCont);
|
||||||
|
userdata->pCont = NULL;
|
||||||
|
}
|
||||||
|
static void destroyCmsg(SCliMsg* pMsg) {
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
transDestroyConnCtx(pMsg->ctx);
|
||||||
|
destroyUserdata(&pMsg->msg);
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
}
|
}
|
||||||
static SCliThrdObj* createThrdObj() {
|
static SCliThrdObj* createThrdObj() {
|
||||||
|
@ -487,7 +510,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
|
||||||
free(pThrd);
|
free(pThrd);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyTransConnCtx(STransConnCtx* ctx) {
|
static void transDestroyConnCtx(STransConnCtx* ctx) {
|
||||||
if (ctx != NULL) {
|
if (ctx != NULL) {
|
||||||
free(ctx->ip);
|
free(ctx->ip);
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
|
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
static void destroySrvMsg(SSrvConn* conn);
|
static void destroySmsg(SSrvMsg* smsg);
|
||||||
// check whether already read complete packet
|
// check whether already read complete packet
|
||||||
static bool readComplete(SConnBuffer* buf);
|
static bool readComplete(SConnBuffer* buf);
|
||||||
static SSrvConn* createConn();
|
static SSrvConn* createConn();
|
||||||
|
@ -305,8 +305,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||||
|
|
||||||
void uvOnWriteCb(uv_write_t* req, int status) {
|
void uvOnWriteCb(uv_write_t* req, int status) {
|
||||||
SSrvConn* conn = req->data;
|
SSrvConn* conn = req->data;
|
||||||
SSrvMsg* smsg = conn->pSrvMsg;
|
|
||||||
destroySrvMsg(conn);
|
SSrvMsg* smsg = conn->pSrvMsg;
|
||||||
|
destroySmsg(smsg);
|
||||||
|
conn->pSrvMsg = NULL;
|
||||||
|
|
||||||
transClearBuffer(&conn->readBuf);
|
transClearBuffer(&conn->readBuf);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
|
@ -362,14 +364,12 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static void destroySrvMsg(SSrvConn* conn) {
|
static void destroySmsg(SSrvMsg* smsg) {
|
||||||
SSrvMsg* smsg = conn->pSrvMsg;
|
|
||||||
if (smsg == NULL) {
|
if (smsg == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transFreeMsg(smsg->msg.pCont);
|
transFreeMsg(smsg->msg.pCont);
|
||||||
free(conn->pSrvMsg);
|
free(smsg);
|
||||||
conn->pSrvMsg = NULL;
|
|
||||||
}
|
}
|
||||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
SWorkThrdObj* pThrd = handle->data;
|
SWorkThrdObj* pThrd = handle->data;
|
||||||
|
@ -555,7 +555,8 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
destroySrvMsg(conn);
|
destroySmsg(conn->pSrvMsg);
|
||||||
|
conn->pSrvMsg = NULL;
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
||||||
|
|
|
@ -165,6 +165,7 @@ int main(int argc, char *argv[]) {
|
||||||
tError("failed to start RPC server");
|
tError("failed to start RPC server");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
// sleep(5);
|
||||||
|
|
||||||
tInfo("RPC server is running, ctrl-c to exit");
|
tInfo("RPC server is running, ctrl-c to exit");
|
||||||
|
|
||||||
|
@ -172,7 +173,6 @@ int main(int argc, char *argv[]) {
|
||||||
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno));
|
if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
qhandle = taosOpenQueue();
|
qhandle = taosOpenQueue();
|
||||||
qset = taosOpenQset();
|
qset = taosOpenQset();
|
||||||
taosAddIntoQset(qset, qhandle, NULL);
|
taosAddIntoQset(qset, qhandle, NULL);
|
||||||
|
|
Loading…
Reference in New Issue