Merge pull request #16099 from taosdata/enh/refactorRpcCode
refactor code
This commit is contained in:
commit
6248d57d7a
|
@ -103,14 +103,6 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||||
static void addConnToPool(void* pool, SCliConn* conn);
|
static void addConnToPool(void* pool, SCliConn* conn);
|
||||||
static void doCloseIdleConn(void* param);
|
static void doCloseIdleConn(void* param);
|
||||||
|
|
||||||
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
|
|
||||||
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
|
|
||||||
|
|
||||||
char buf[16] = {0};
|
|
||||||
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
|
|
||||||
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
// register timer for read
|
// register timer for read
|
||||||
static void cliReadTimeoutCb(uv_timer_t* handle);
|
static void cliReadTimeoutCb(uv_timer_t* handle);
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
|
@ -121,12 +113,14 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_
|
||||||
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
// callback after write data to socket
|
// callback after write data to socket
|
||||||
static void cliSendCb(uv_write_t* req, int status);
|
static void cliSendCb(uv_write_t* req, int status);
|
||||||
// callback after conn to server
|
// callback after conn to server
|
||||||
static void cliConnCb(uv_connect_t* req, int status);
|
static void cliConnCb(uv_connect_t* req, int status);
|
||||||
static void cliAsyncCb(uv_async_t* handle);
|
static void cliAsyncCb(uv_async_t* handle);
|
||||||
static void cliIdleCb(uv_idle_t* handle);
|
static void cliIdleCb(uv_idle_t* handle);
|
||||||
static void cliPrepareCb(uv_prepare_t* handle);
|
static void cliPrepareCb(uv_prepare_t* handle);
|
||||||
|
|
||||||
|
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
|
||||||
|
|
||||||
static int32_t allocConnRef(SCliConn* conn, bool update);
|
static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||||
|
|
||||||
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||||
|
@ -361,6 +355,9 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = NULL;
|
||||||
STransConnCtx* pCtx = NULL;
|
STransConnCtx* pCtx = NULL;
|
||||||
|
if (cliRecvReleaseReq(conn, pHead)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
CONN_SHOULD_RELEASE(conn, pHead);
|
CONN_SHOULD_RELEASE(conn, pHead);
|
||||||
|
|
||||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
|
@ -383,7 +380,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
transMsg.info.ahandle);
|
transMsg.info.ahandle);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pCtx = pMsg ? pMsg->ctx : NULL;
|
pCtx = pMsg->ctx;
|
||||||
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
|
||||||
}
|
}
|
||||||
|
@ -395,7 +392,6 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STraceId* trace = &transMsg.info.traceId;
|
STraceId* trace = &transMsg.info.traceId;
|
||||||
|
|
||||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
|
||||||
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
|
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
|
||||||
|
|
||||||
|
@ -1053,6 +1049,30 @@ static void cliPrepareCb(uv_prepare_t* handle) {
|
||||||
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
|
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||||
|
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||||
|
uint64_t ahandle = pHead->ahandle;
|
||||||
|
SCliMsg* pMsg = NULL;
|
||||||
|
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||||
|
transClearBuffer(&conn->readBuf);
|
||||||
|
transFreeMsg(transContFromHead((char*)pHead));
|
||||||
|
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
|
||||||
|
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
|
||||||
|
if (cliMsg->type == Release) return true;
|
||||||
|
}
|
||||||
|
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
|
||||||
|
if (T_REF_VAL_GET(conn) > 1) {
|
||||||
|
transUnrefCliHandle(conn);
|
||||||
|
}
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
cliReleaseUnfinishedMsg(conn);
|
||||||
|
transQueueClear(&conn->cliMsgs);
|
||||||
|
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg) {
|
static void* cliWorkThread(void* arg) {
|
||||||
SCliThrd* pThrd = (SCliThrd*)arg;
|
SCliThrd* pThrd = (SCliThrd*)arg;
|
||||||
pThrd->pid = taosGetSelfPthreadId();
|
pThrd->pid = taosGetSelfPthreadId();
|
||||||
|
|
|
@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
||||||
static void uvPrepareCb(uv_prepare_t* handle);
|
static void uvPrepareCb(uv_prepare_t* handle);
|
||||||
|
|
||||||
|
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* time-consuming task throwed into BG work thread
|
* time-consuming task throwed into BG work thread
|
||||||
*/
|
*/
|
||||||
|
@ -123,7 +125,7 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
|
||||||
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
static void uvWalkCb(uv_handle_t* handle, void* arg);
|
||||||
static void uvFreeCb(uv_handle_t* handle);
|
static void uvFreeCb(uv_handle_t* handle);
|
||||||
|
|
||||||
static void uvStartSendRespInternal(SSvrMsg* smsg);
|
static void uvStartSendRespImpl(SSvrMsg* smsg);
|
||||||
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSvrMsg* msg);
|
static void uvStartSendResp(SSvrMsg* msg);
|
||||||
|
|
||||||
|
@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg);
|
||||||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
||||||
static bool addHandleToAcceptloop(void* arg);
|
static bool addHandleToAcceptloop(void* arg);
|
||||||
|
|
||||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
|
||||||
do { \
|
|
||||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
|
||||||
reallocConnRef(conn); \
|
|
||||||
tTrace("conn %p received release request", conn); \
|
|
||||||
\
|
|
||||||
STraceId traceId = head->traceId; \
|
|
||||||
conn->status = ConnRelease; \
|
|
||||||
transClearBuffer(&conn->readBuf); \
|
|
||||||
transFreeMsg(transContFromHead((char*)head)); \
|
|
||||||
\
|
|
||||||
STransMsg tmsg = { \
|
|
||||||
.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \
|
|
||||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
|
|
||||||
srvMsg->msg = tmsg; \
|
|
||||||
srvMsg->type = Release; \
|
|
||||||
srvMsg->pConn = conn; \
|
|
||||||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
|
||||||
return; \
|
|
||||||
} \
|
|
||||||
if (conn->regArg.init) { \
|
|
||||||
tTrace("conn %p release, notify server app", conn); \
|
|
||||||
STrans* pTransInst = conn->pTransInst; \
|
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
|
||||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
|
||||||
} \
|
|
||||||
uvStartSendRespInternal(srvMsg); \
|
|
||||||
return; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define SRV_RELEASE_UV(loop) \
|
#define SRV_RELEASE_UV(loop) \
|
||||||
do { \
|
do { \
|
||||||
uv_walk(loop, uvWalkCb, NULL); \
|
uv_walk(loop, uvWalkCb, NULL); \
|
||||||
|
@ -230,7 +201,9 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
// transRefSrvHandle(pConn);
|
// transRefSrvHandle(pConn);
|
||||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||||
|
|
||||||
CONN_SHOULD_RELEASE(pConn, pHead);
|
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
STransMsg transMsg;
|
STransMsg transMsg;
|
||||||
memset(&transMsg, 0, sizeof(transMsg));
|
memset(&transMsg, 0, sizeof(transMsg));
|
||||||
|
@ -356,10 +329,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
|
|
||||||
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
||||||
if (msg != NULL) {
|
if (msg != NULL) {
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespImpl(msg);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespImpl(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -423,7 +396,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvStartSendRespInternal(SSvrMsg* smsg) {
|
static void uvStartSendRespImpl(SSvrMsg* smsg) {
|
||||||
SSvrConn* pConn = smsg->pConn;
|
SSvrConn* pConn = smsg->pConn;
|
||||||
if (pConn->broken) {
|
if (pConn->broken) {
|
||||||
return;
|
return;
|
||||||
|
@ -453,7 +426,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
|
||||||
if (!transQueuePush(&pConn->srvMsgs, smsg)) {
|
if (!transQueuePush(&pConn->srvMsgs, smsg)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uvStartSendRespInternal(smsg);
|
uvStartSendRespImpl(smsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,6 +517,35 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
|
uv_close((uv_handle_t*)req->handle, uvDestroyConn);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
||||||
|
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||||
|
reallocConnRef(pConn);
|
||||||
|
tTrace("conn %p received release request", pConn);
|
||||||
|
|
||||||
|
STraceId traceId = pHead->traceId;
|
||||||
|
pConn->status = ConnRelease;
|
||||||
|
transClearBuffer(&pConn->readBuf);
|
||||||
|
transFreeMsg(transContFromHead((char*)pHead));
|
||||||
|
|
||||||
|
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
|
||||||
|
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
srvMsg->msg = tmsg;
|
||||||
|
srvMsg->type = Release;
|
||||||
|
srvMsg->pConn = pConn;
|
||||||
|
if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (pConn->regArg.init) {
|
||||||
|
tTrace("conn %p release, notify server app", pConn);
|
||||||
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
|
(*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL);
|
||||||
|
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
||||||
|
}
|
||||||
|
uvStartSendRespImpl(srvMsg);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
static void uvPrepareCb(uv_prepare_t* handle) {
|
static void uvPrepareCb(uv_prepare_t* handle) {
|
||||||
// prepare callback
|
// prepare callback
|
||||||
SWorkThrd* pThrd = handle->data;
|
SWorkThrd* pThrd = handle->data;
|
||||||
|
@ -992,7 +994,7 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespImpl(msg);
|
||||||
return;
|
return;
|
||||||
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
|
} else if (conn->status == ConnRelease || conn->status == ConnNormal) {
|
||||||
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
|
tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
|
||||||
|
|
Loading…
Reference in New Issue