handle quit error

This commit is contained in:
yihaoDeng 2024-10-30 13:35:34 +08:00
parent 989457969e
commit 5ba90c2388
1 changed files with 8 additions and 16 deletions

View File

@ -127,6 +127,7 @@ typedef struct {
typedef struct SCliReq { typedef struct SCliReq {
SReqCtx* ctx; SReqCtx* ctx;
queue q; queue q;
queue sendQ;
STransMsgType type; STransMsgType type;
uint64_t st; uint64_t st;
int64_t seq; int64_t seq;
@ -1309,8 +1310,8 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
queue* h = QUEUE_HEAD(&wrapper->node); queue* h = QUEUE_HEAD(&wrapper->node);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
transQueuePush(&conn->reqsSentOut, &pReq->q); pReq->sent = 1;
} }
freeWReqToWQ(&conn->wq, wrapper); freeWReqToWQ(&conn->wq, wrapper);
@ -1471,14 +1472,16 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
wb[j++] = uv_buf_init((char*)pHead, msgLen); wb[j++] = uv_buf_init((char*)pHead, msgLen);
totalLen += msgLen; totalLen += msgLen;
pCliMsg->sent = 1;
pCliMsg->seq = pConn->seq; pCliMsg->seq = pConn->seq;
STraceId* trace = &pCliMsg->msg.info.traceId; STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
QUEUE_PUSH(&reqToSend, &pCliMsg->q); // QUEUE_PUSH(&reqToSend, &pCliMsg->q);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
QUEUE_INIT(&pCliMsg->sendQ);
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
if (j >= batchLimit) { if (j >= batchLimit) {
break; break;
} }
@ -1488,29 +1491,18 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
if (req == NULL) { if (req == NULL) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
while (!QUEUE_IS_EMPTY(&reqToSend)) {
queue* h = QUEUE_HEAD(&reqToSend);
QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
transQueuePush(&pConn->reqsToSend, &pReq->q);
}
transRefCliHandle(pConn); transRefCliHandle(pConn);
return terrno; return terrno;
} }
SWReqsWrapper* pWreq = req->data; SWReqsWrapper* pWreq = req->data;
QUEUE_MOVE(&reqToSend, &pWreq->node); QUEUE_MOVE(&reqToSend, &pWreq->node);
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
if (ret != 0) { if (ret != 0) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
queue* h = QUEUE_HEAD(&pWreq->node);
QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
transQueuePush(&pConn->reqsToSend, &pReq->q);
}
freeWReqToWQ(&pConn->wq, req->data); freeWReqToWQ(&pConn->wq, req->data);
code = TSDB_CODE_THIRDPARTY_ERROR; code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_UNUSED(transUnrefCliHandle(pConn)); TAOS_UNUSED(transUnrefCliHandle(pConn));