fix: limit session num

This commit is contained in:
yihaoDeng 2023-02-24 21:20:15 +08:00
parent dd2e9697b5
commit 84706fe586
10 changed files with 16 additions and 11 deletions

View File

@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize;
// queue & threads // queue & threads
extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions; extern int32_t tsNumOfRpcSessions;
extern int32_t tsTimeToGetAvailableConn;
extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads; extern int32_t tsNumOfMnodeQueryThreads;

View File

@ -114,7 +114,7 @@ typedef struct SRpcInit {
int32_t connLimitNum; int32_t connLimitNum;
int32_t connLimitLock; int32_t connLimitLock;
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize; int32_t batchSize;
void *parent; void *parent;

View File

@ -159,6 +159,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {

View File

@ -2012,6 +2012,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {

View File

@ -41,8 +41,8 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 2000; int32_t tsNumOfRpcSessions = 10000;
int32_t tsTimeToGetAvailableConn = 1000; int32_t tsTimeToGetAvailableConn = 10000;
int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4;

View File

@ -292,6 +292,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitLock = 1; rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {

View File

@ -64,11 +64,11 @@ typedef struct {
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType); bool (*failFastFp)(tmsg_t msgType);
int32_t connLimitNum; int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize; int32_t batchSize;
int32_t timeToGetConn;
int index; int index;
void* parent; void* parent;
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization

View File

@ -90,7 +90,7 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) { if (pInit->user) {
tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
} }
pRpc->timeToGetConn = pInit->timeToGetConn;
pRpc->tcphandle = pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);

View File

@ -1422,7 +1422,7 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200); pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q);
return -1; return -1;

View File

@ -21,7 +21,7 @@ static void shellWorkAsClient() {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SEpSet epSet = {.inUse = 0, .numOfEps = 1};
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
void * clientRpc = NULL; void *clientRpc = NULL;
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass);
@ -31,6 +31,7 @@ static void shellWorkAsClient() {
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {