Merge branch 'feature/scheduler' of github.com:taosdata/TDengine into feature/scheduler

This commit is contained in:
dapan1121 2022-03-24 09:29:31 +08:00
commit 13690803af
1 changed files with 39 additions and 36 deletions

View File

@ -169,6 +169,19 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
pMsg = transQueueRm(&conn->cliMsgs, i); \ pMsg = transQueueRm(&conn->cliMsgs, i); \
} \ } \
} while (0) } while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \
do { \
int i = 0; \
do { \
pCliMsg = transQueueGet(&conn->cliMsgs, i++); \
if (pCliMsg && 0 == pCliMsg->sent) { \
break; \
} \
} while (pCliMsg != NULL); \
if (pCliMsg == NULL) { \
goto _RETURN; \
} \
} while (0)
#define CONN_HANDLE_THREAD_QUIT(thrd) \ #define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \ do { \
@ -193,7 +206,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \ } \
} while (0) } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) ((conn)->status == ConnRelease && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
@ -203,19 +216,12 @@ static void* cliWorkThread(void* arg);
bool cliMaySendCachedMsg(SCliConn* conn) { bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) { if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL; SCliMsg* pCliMsg = NULL;
int i = 0; CONN_GET_NEXT_SENDMSG(conn);
do {
pCliMsg = transQueueGet(&conn->cliMsgs, i++);
if (pCliMsg && 0 == pCliMsg->sent) {
break;
}
} while (pCliMsg != NULL);
if (pCliMsg == NULL) {
return false;
}
cliSend(conn); cliSend(conn);
} }
return false; return false;
_RETURN:
return false;
} }
void cliHandleResp(SCliConn* conn) { void cliHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
@ -255,9 +261,9 @@ void cliHandleResp(SCliConn* conn) {
if (pMsg == NULL) { if (pMsg == NULL) {
transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType); tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType);
if (transMsg.ahandle == NULL) { if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle = NULL) {
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle);
transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle);
} }
} else { } else {
pCtx = pMsg ? pMsg->ctx : NULL; pCtx = pMsg ? pMsg->ctx : NULL;
@ -284,6 +290,11 @@ void cliHandleResp(SCliConn* conn) {
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
return; return;
} }
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.ahandle == NULL) {
tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn);
return;
}
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
@ -565,17 +576,7 @@ void cliSend(SCliConn* pConn) {
assert(!transQueueEmpty(&pConn->cliMsgs)); assert(!transQueueEmpty(&pConn->cliMsgs));
SCliMsg* pCliMsg = NULL; SCliMsg* pCliMsg = NULL;
int i = 0; CONN_GET_NEXT_SENDMSG(pConn);
do {
pCliMsg = transQueueGet(&pConn->cliMsgs, i++);
if (pCliMsg && 0 == pCliMsg->sent) {
break;
}
} while (pCliMsg != NULL);
if (pCliMsg == NULL) {
return;
}
pCliMsg->sent = 1; pCliMsg->sent = 1;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
@ -630,6 +631,8 @@ void cliSend(SCliConn* pConn) {
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURN:
return; return;
} }