handle too many session
This commit is contained in:
parent
e44704b20e
commit
b8dfc71446
|
@ -280,11 +280,11 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
|
||||
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
|
||||
|
||||
rpcInit.failFastInterval = 1000; // interval threshold(ms)
|
||||
rpcInit.failFastInterval = 5000; // interval threshold(ms)
|
||||
rpcInit.failFastThreshold = 3; // failed threshold
|
||||
rpcInit.ffp = dmFailFastFp;
|
||||
|
||||
rpcInit.connLimit = 7500;
|
||||
rpcInit.connLimit = 3000;
|
||||
|
||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->clientRpc == NULL) {
|
||||
|
|
|
@ -40,9 +40,8 @@ typedef struct SCliConn {
|
|||
bool broken; // link broken or not
|
||||
ConnStatus status; //
|
||||
|
||||
int64_t refId;
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
int64_t refId;
|
||||
char* ip;
|
||||
|
||||
SDelayTask* task;
|
||||
|
||||
|
@ -86,6 +85,7 @@ typedef struct SCliThrd {
|
|||
SCvtAddr cvtAddr;
|
||||
|
||||
SHashObj* failFastCache;
|
||||
SHashObj* connLimit;
|
||||
|
||||
SCliMsg* stopMsg;
|
||||
|
||||
|
@ -570,10 +570,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
conn->status = ConnInPool;
|
||||
|
||||
if (conn->list == NULL) {
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
|
||||
} else {
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
}
|
||||
|
@ -751,6 +749,11 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
|
||||
int32_t* oVal = taosHashGet(pThrd->connLimit, conn->ip, strlen(conn->ip));
|
||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1;
|
||||
taosHashPut(pThrd->connLimit, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
|
||||
|
||||
taosMemoryFree(conn);
|
||||
}
|
||||
static bool cliHandleNoResp(SCliConn* conn) {
|
||||
|
@ -892,8 +895,8 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
|
||||
tGError("%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s", CONN_GET_INST_LABEL(pConn),
|
||||
pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, pConn->port, uv_strerror(status));
|
||||
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
|
||||
pMsg ? TMSG_INFO(pMsg->msg.msgType) : 0, pConn, pConn->ip, uv_strerror(status));
|
||||
uv_timer_stop(pConn->timer);
|
||||
pConn->timer->data = NULL;
|
||||
taosArrayPush(pThrd->timerList, &pConn->timer);
|
||||
|
@ -901,12 +904,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
|
||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||
char* ip = pConn->ip;
|
||||
uint32_t port = pConn->port;
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
|
||||
int64_t cTimestamp = taosGetTimestampMs();
|
||||
if (item != NULL) {
|
||||
int32_t elapse = cTimestamp - item->timestamp;
|
||||
|
@ -918,7 +916,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
}
|
||||
} else {
|
||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
|
||||
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
|
||||
}
|
||||
}
|
||||
cliHandleExcept(pConn);
|
||||
|
@ -931,9 +929,13 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
cliHandleFastFail(pConn, status);
|
||||
return;
|
||||
}
|
||||
struct sockaddr peername, sockname;
|
||||
|
||||
int addrlen = sizeof(peername);
|
||||
int32_t* oVal = taosHashGet(pThrd->connLimit, pConn->ip, strlen(pConn->ip));
|
||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1;
|
||||
taosHashPut(pThrd->connLimit, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
|
||||
|
||||
struct sockaddr peername, sockname;
|
||||
int addrlen = sizeof(peername);
|
||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
|
||||
transSockInfo2Str(&peername, pConn->dst);
|
||||
|
||||
|
@ -1068,6 +1070,24 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
|||
return;
|
||||
}
|
||||
|
||||
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
int32_t* val = taosHashGet(pThrd->connLimit, key, strlen(key));
|
||||
if (val == NULL) return 0;
|
||||
|
||||
if (*val >= pTransInst->connLimit) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
@ -1091,7 +1111,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
if (item != NULL) {
|
||||
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
|
||||
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
|
||||
STraceId* trace = &(pMsg->msg.info.traceId);
|
||||
tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label,
|
||||
TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse);
|
||||
destroyCmsg(pMsg);
|
||||
|
@ -1113,6 +1132,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, pMsg)) {
|
||||
tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
|
||||
tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (conn != NULL) {
|
||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
@ -1126,10 +1152,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
||||
conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet));
|
||||
conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
|
||||
conn->ip = strdup(key);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, EPSET_GET_INUSE_IP(&pCtx->epSet));
|
||||
if (ipaddr == 0xffffffff) {
|
||||
uv_timer_stop(conn->timer);
|
||||
conn->timer->data = NULL;
|
||||
|
@ -1143,9 +1173,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
struct sockaddr_in addr;
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = ipaddr;
|
||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||
addr.sin_port = (uint16_t)htons(port);
|
||||
|
||||
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
|
||||
if (fd == -1) {
|
||||
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
|
||||
|
@ -1199,7 +1229,6 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
if (count >= 2) {
|
||||
tTrace("cli process batch size:%d", count);
|
||||
}
|
||||
// if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb);
|
||||
|
||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
|
||||
}
|
||||
|
@ -1412,6 +1441,7 @@ static SCliThrd* createThrdObj(void* trans) {
|
|||
pThrd->destroyAhandleFp = pTransInst->destroyFp;
|
||||
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pThrd->connLimit = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
||||
pThrd->quit = false;
|
||||
return pThrd;
|
||||
|
@ -1440,6 +1470,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
taosMemoryFree(pThrd->loop);
|
||||
taosHashCleanup(pThrd->fqdn2ipCache);
|
||||
taosHashCleanup(pThrd->failFastCache);
|
||||
taosHashCleanup(pThrd->connLimit);
|
||||
taosMemoryFree(pThrd);
|
||||
}
|
||||
|
||||
|
@ -1864,13 +1895,6 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
// read only
|
||||
if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_MAX_SESSIONS;
|
||||
}
|
||||
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||
|
@ -1912,13 +1936,6 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
|||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
// not limit sync req
|
||||
// read only
|
||||
// if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) {
|
||||
// transFreeMsg(pReq->pCont);
|
||||
// transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
// return TSDB_CODE_RPC_MAX_SESSIONS;
|
||||
//}
|
||||
|
||||
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||
tsem_init(sem, 0, 0);
|
||||
|
|
Loading…
Reference in New Issue