handle quit error
This commit is contained in:
parent
5ba90c2388
commit
57743282db
|
@ -132,6 +132,7 @@ typedef struct SCliReq {
|
||||||
uint64_t st;
|
uint64_t st;
|
||||||
int64_t seq;
|
int64_t seq;
|
||||||
int32_t sent; //(0: no send, 1: alread sent)
|
int32_t sent; //(0: no send, 1: alread sent)
|
||||||
|
int8_t inSendQ;
|
||||||
STransMsg msg;
|
STransMsg msg;
|
||||||
int8_t inRetry;
|
int8_t inRetry;
|
||||||
|
|
||||||
|
@ -275,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
|
||||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
||||||
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq);
|
||||||
|
|
||||||
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
||||||
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
||||||
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
||||||
|
@ -701,6 +704,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
|
|
||||||
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
||||||
STraceId* trace = &resp.info.traceId;
|
STraceId* trace = &resp.info.traceId;
|
||||||
|
@ -1279,7 +1283,7 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
bool filterToRmReq(void* h, void* arg) {
|
bool filterToRmReq(void* h, void* arg) {
|
||||||
queue* el = h;
|
queue* el = h;
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
|
if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1311,7 +1315,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
|
SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
pReq->sent = 1;
|
pReq->inSendQ = 0;
|
||||||
}
|
}
|
||||||
freeWReqToWQ(&conn->wq, wrapper);
|
freeWReqToWQ(&conn->wq, wrapper);
|
||||||
|
|
||||||
|
@ -1473,6 +1477,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
totalLen += msgLen;
|
totalLen += msgLen;
|
||||||
|
|
||||||
pCliMsg->seq = pConn->seq;
|
pCliMsg->seq = pConn->seq;
|
||||||
|
pCliMsg->inSendQ = 1;
|
||||||
|
|
||||||
STraceId* trace = &pCliMsg->msg.info.traceId;
|
STraceId* trace = &pCliMsg->msg.info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
||||||
|
@ -1482,6 +1487,8 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
||||||
QUEUE_INIT(&pCliMsg->sendQ);
|
QUEUE_INIT(&pCliMsg->sendQ);
|
||||||
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
|
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
|
||||||
|
|
||||||
|
pCliMsg->inSendQ = 1;
|
||||||
if (j >= batchLimit) {
|
if (j >= batchLimit) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1491,6 +1498,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
|
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
||||||
|
while (!QUEUE_IS_EMPTY(&reqToSend)) {
|
||||||
|
queue* h = QUEUE_HEAD(&reqToSend);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
pCliMsg->inSendQ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
transRefCliHandle(pConn);
|
transRefCliHandle(pConn);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -1503,6 +1518,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
||||||
|
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
|
||||||
|
queue* h = QUEUE_HEAD(&pWreq->node);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
pCliMsg->inSendQ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
freeWReqToWQ(&pConn->wq, req->data);
|
freeWReqToWQ(&pConn->wq, req->data);
|
||||||
code = TSDB_CODE_THIRDPARTY_ERROR;
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
||||||
|
@ -2214,11 +2237,21 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) {
|
||||||
|
if (pReq == NULL || pReq->inSendQ == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
QUEUE_REMOVE(&pReq->sendQ);
|
||||||
|
pReq->inSendQ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyReq(void* arg) {
|
static FORCE_INLINE void destroyReq(void* arg) {
|
||||||
SCliReq* pReq = arg;
|
SCliReq* pReq = arg;
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
STraceId* trace = &pReq->msg.info.traceId;
|
STraceId* trace = &pReq->msg.info.traceId;
|
||||||
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
||||||
|
|
||||||
|
@ -2993,6 +3026,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
if (pResp->code != TSDB_CODE_SUCCESS) {
|
if (pResp->code != TSDB_CODE_SUCCESS) {
|
||||||
if (cliMayRetry(pConn, pReq, pResp)) {
|
if (cliMayRetry(pConn, pReq, pResp)) {
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
|
|
Loading…
Reference in New Issue