diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d0cb9af710..4269865993 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -169,6 +169,19 @@ static void destroyThrdObj(SCliThrdObj* pThrd); pMsg = transQueueRm(&conn->cliMsgs, i); \ } \ } while (0) +#define CONN_GET_NEXT_SENDMSG(conn) \ + do { \ + int i = 0; \ + do { \ + pCliMsg = transQueueGet(&conn->cliMsgs, i++); \ + if (pCliMsg && 0 == pCliMsg->sent) { \ + break; \ + } \ + } while (pCliMsg != NULL); \ + if (pCliMsg == NULL) { \ + goto _RETURN; \ + } \ + } while (0) #define CONN_HANDLE_THREAD_QUIT(thrd) \ do { \ @@ -193,7 +206,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) - +#define CONN_RELEASE_BY_SERVER(conn) ((conn)->status == ConnRelease && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) @@ -203,19 +216,12 @@ static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; - int i = 0; - do { - pCliMsg = transQueueGet(&conn->cliMsgs, i++); - if (pCliMsg && 0 == pCliMsg->sent) { - break; - } - } while (pCliMsg != NULL); - if (pCliMsg == NULL) { - return false; - } + CONN_GET_NEXT_SENDMSG(conn); cliSend(conn); } return false; +_RETURN: + return false; } void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; @@ -238,26 +244,26 @@ void cliHandleResp(SCliConn* conn) { if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); - pCtx = pMsg ? pMsg->ctx: NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); - } - tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); - } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; - tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); - } + pCtx = pMsg ? pMsg->ctx : NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } + tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); + } else { + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); + } } else { uint64_t ahandle = (uint64_t)pHead->ahandle; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); if (pMsg == NULL) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType); - if (transMsg.ahandle == NULL) { - tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); + if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle = NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); } } else { pCtx = pMsg ? pMsg->ctx : NULL; @@ -284,6 +290,11 @@ void cliHandleResp(SCliConn* conn) { // transUnrefCliHandle(conn); return; } + if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) { + tTrace("except, server continue send while cli ignore it"); + // transUnrefCliHandle(conn); + return; + } if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); @@ -320,7 +331,7 @@ void cliHandleExcept(SCliConn* pConn) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - while(!transQueueEmpty(&pConn->cliMsgs)){ + while (!transQueueEmpty(&pConn->cliMsgs)) { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; @@ -565,17 +576,7 @@ void cliSend(SCliConn* pConn) { assert(!transQueueEmpty(&pConn->cliMsgs)); SCliMsg* pCliMsg = NULL; - int i = 0; - do { - pCliMsg = transQueueGet(&pConn->cliMsgs, i++); - if (pCliMsg && 0 == pCliMsg->sent) { - break; - } - } while (pCliMsg != NULL); - if (pCliMsg == NULL) { - return; - } - + CONN_GET_NEXT_SENDMSG(pConn); pCliMsg->sent = 1; STransConnCtx* pCtx = pCliMsg->ctx; @@ -630,6 +631,8 @@ void cliSend(SCliConn* pConn) { pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); + return; +_RETURN: return; }