opt parameter
This commit is contained in:
parent
1bac4a8cf0
commit
24a2da04c1
|
@ -238,6 +238,7 @@ void destroyCliConnQTable(SCliConn* conn) {
|
||||||
|
|
||||||
// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
|
// static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
|
||||||
|
|
||||||
|
static void cliHandleBatch_shareConnExcept(SCliConn* conn);
|
||||||
static int32_t allocConnRef(SCliConn* conn, bool update);
|
static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||||
|
|
||||||
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
|
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
|
||||||
|
@ -921,7 +922,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
if (TRANS_CONN_REF_GET(conn) > 1) {
|
if (TRANS_CONN_REF_GET(conn) > 1) {
|
||||||
transUnrefCliHandle(conn);
|
transUnrefCliHandle(conn);
|
||||||
}
|
}
|
||||||
cliDestroyConnMsgs(conn, false);
|
// cliDestroyConnMsgs(conn, false);
|
||||||
|
|
||||||
if (conn->list == NULL && conn->dstAddr != NULL) {
|
if (conn->list == NULL && conn->dstAddr != NULL) {
|
||||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||||
|
@ -1046,7 +1047,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
if (pBuf->invalid) {
|
if (pBuf->invalid) {
|
||||||
cliHandleExcept(conn, -1);
|
return cliHandleBatch_shareConnExcept(conn);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
cliHandleResp2(conn);
|
cliHandleResp2(conn);
|
||||||
|
@ -1238,7 +1239,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid);
|
tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid);
|
||||||
}
|
}
|
||||||
|
|
||||||
cliDestroyConnMsgs(conn, true);
|
// cliDestroyConnMsgs(conn, true);
|
||||||
destroyCliConnQTable(conn);
|
destroyCliConnQTable(conn);
|
||||||
|
|
||||||
if (conn->pInitUserReq) {
|
if (conn->pInitUserReq) {
|
||||||
|
@ -1260,6 +1261,10 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
queue set;
|
queue set;
|
||||||
|
QUEUE_INIT(&set);
|
||||||
|
// TODO
|
||||||
|
// 1. from qId from thread table
|
||||||
|
// 2. not itera to all reqs
|
||||||
transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1);
|
transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1);
|
||||||
transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1);
|
transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1);
|
||||||
|
|
||||||
|
@ -1275,8 +1280,13 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
|
||||||
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
|
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
|
||||||
resp.info.cliVer = pInst->compatibilityVer;
|
resp.info.cliVer = pInst->compatibilityVer;
|
||||||
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
|
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
|
||||||
// not
|
if (pReq) {
|
||||||
if (pCtx == NULL) {
|
resp.info.traceId = pReq->msg.info.traceId;
|
||||||
|
}
|
||||||
|
// resp.info.traceId = pReq ? pReq->msg.info.traceId : {0, 0};
|
||||||
|
|
||||||
|
// handle noresp and inter manage msg
|
||||||
|
if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) {
|
||||||
destroyReq(pReq);
|
destroyReq(pReq);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1290,6 +1300,9 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
|
||||||
destroyReq(pReq);
|
destroyReq(pReq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||||
|
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fileToRmReq(void* h, void* arg) {
|
bool fileToRmReq(void* h, void* arg) {
|
||||||
|
@ -1612,16 +1625,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
|
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
|
||||||
uv_strerror(status));
|
uv_strerror(status));
|
||||||
// queue set;
|
cliHandleBatch_shareConnExcept(pConn);
|
||||||
// transQueueRemoveByFilter(&pConn->reqsToSend, filteGetAll, NULL, &set, -1);
|
|
||||||
// transQueueRemoveByFilter(&pConn->reqsSentOut, filteGetAll, NULL, &set, -1);
|
|
||||||
// while (!QUEUE_IS_EMPTY(&set)) {
|
|
||||||
// queue* el = QUEUE_HEAD(&set);
|
|
||||||
// SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
|
||||||
// cliBuildExeceptMsg(pConn, pReq, &pReq->msg);
|
|
||||||
// destroyReq(pReq);
|
|
||||||
// }
|
|
||||||
cliHandleExcept(pConn, status);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pConn->connnected = 1;
|
pConn->connnected = 1;
|
||||||
|
@ -1881,7 +1885,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
|
||||||
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
|
SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid));
|
||||||
|
|
||||||
if (pState == NULL) {
|
if (pState == NULL) {
|
||||||
if (pReq->ctx == NULL || pReq->ctx->ahandle == 0) {
|
if (pReq->ctx == NULL) {
|
||||||
return TSDB_CODE_RPC_STATE_DROPED;
|
return TSDB_CODE_RPC_STATE_DROPED;
|
||||||
}
|
}
|
||||||
tDebug("failed to get statue, qid:%ld", qid);
|
tDebug("failed to get statue, qid:%ld", qid);
|
||||||
|
|
Loading…
Reference in New Issue