handle fastfail
This commit is contained in:
parent
cb35b453d4
commit
09e5ca7a0f
|
@ -579,7 +579,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
QUEUE_PUSH(&conn->list->conns, &conn->q);
|
QUEUE_PUSH(&conn->list->conns, &conn->q);
|
||||||
conn->list->size += 1;
|
conn->list->size += 1;
|
||||||
|
|
||||||
if (conn->list->size >= 50) {
|
if (conn->list->size >= 250) {
|
||||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||||
arg->param1 = conn;
|
arg->param1 = conn;
|
||||||
arg->param2 = thrd;
|
arg->param2 = thrd;
|
||||||
|
@ -882,47 +882,50 @@ void cliSend(SCliConn* pConn) {
|
||||||
_RETURN:
|
_RETURN:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||||
|
SCliThrd* pThrd = pConn->hostThrd;
|
||||||
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
|
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
|
||||||
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
|
|
||||||
|
tGError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn),
|
||||||
|
pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status));
|
||||||
|
uv_timer_stop(pConn->timer);
|
||||||
|
pConn->timer->data = NULL;
|
||||||
|
taosArrayPush(pThrd->timerList, &pConn->timer);
|
||||||
|
pConn->timer = NULL;
|
||||||
|
|
||||||
|
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||||
|
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||||
|
char* ip = pConn->ip;
|
||||||
|
uint32_t port = pConn->port;
|
||||||
|
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||||
|
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||||
|
|
||||||
|
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
|
||||||
|
int64_t cTimestamp = taosGetTimestampMs();
|
||||||
|
if (item != NULL) {
|
||||||
|
int32_t elapse = cTimestamp - item->timestamp;
|
||||||
|
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
|
||||||
|
item->count++;
|
||||||
|
} else {
|
||||||
|
item->count = 1;
|
||||||
|
item->timestamp = cTimestamp;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||||
|
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cliHandleExcept(pConn);
|
||||||
|
}
|
||||||
void cliConnCb(uv_connect_t* req, int status) {
|
void cliConnCb(uv_connect_t* req, int status) {
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
SCliThrd* pThrd = pConn->hostThrd;
|
SCliThrd* pThrd = pConn->hostThrd;
|
||||||
|
|
||||||
if (pConn->timer != NULL) {
|
|
||||||
uv_timer_stop(pConn->timer);
|
|
||||||
pConn->timer->data = NULL;
|
|
||||||
taosArrayPush(pThrd->timerList, &pConn->timer);
|
|
||||||
pConn->timer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
|
cliHandleFastFail(pConn, status);
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
|
||||||
|
|
||||||
tError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn),
|
|
||||||
pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status));
|
|
||||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
|
||||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
|
||||||
char* ip = pConn->ip;
|
|
||||||
uint32_t port = pConn->port;
|
|
||||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
|
||||||
|
|
||||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
|
|
||||||
int64_t cTimestamp = taosGetTimestampMs();
|
|
||||||
if (item != NULL) {
|
|
||||||
int32_t elapse = cTimestamp - item->timestamp;
|
|
||||||
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
|
|
||||||
item->count++;
|
|
||||||
} else {
|
|
||||||
item->count = 1;
|
|
||||||
item->timestamp = cTimestamp;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
|
||||||
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cliHandleExcept(pConn);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
struct sockaddr peername, sockname;
|
struct sockaddr peername, sockname;
|
||||||
|
@ -1163,15 +1166,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
|
cliHandleFastFail(conn, ret);
|
||||||
uv_err_name(ret));
|
|
||||||
|
|
||||||
uv_timer_stop(conn->timer);
|
|
||||||
conn->timer->data = NULL;
|
|
||||||
taosArrayPush(pThrd->timerList, &conn->timer);
|
|
||||||
conn->timer = NULL;
|
|
||||||
|
|
||||||
cliHandleExcept(conn);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||||
|
|
|
@ -246,11 +246,11 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (cost >= EXCEPTION_LIMIT_US) {
|
if (cost >= EXCEPTION_LIMIT_US) {
|
||||||
tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus, recv exception",
|
tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
|
||||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
||||||
transMsg.code, (int)(cost));
|
transMsg.code, (int)(cost));
|
||||||
} else {
|
} else {
|
||||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus",
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
|
||||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
||||||
transMsg.code, (int)(cost));
|
transMsg.code, (int)(cost));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue