fix: limit session num

This commit is contained in:
yihaoDeng 2023-02-25 18:01:17 +08:00
parent b0116445e3
commit 97c4dfb570
3 changed files with 21 additions and 16 deletions

View File

@ -756,7 +756,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numofrpcsessions")->i32; tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;

View File

@ -93,15 +93,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
break; break;
} }
/* /*
pDnode is null, TD-22618 pDnode is null, TD-22618
at trans.c line 91 at trans.c line 91
before this line, dmProcessRpcMsg callback is set before this line, dmProcessRpcMsg callback is set
after this line, parent is set after this line, parent is set
so when dmProcessRpcMsg is called, pDonde is still null. so when dmProcessRpcMsg is called, pDonde is still null.
*/ */
if (pDnode != NULL){ if (pDnode != NULL) {
if(pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) { if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
dmProcessServerStartupStatus(pDnode, pRpc); dmProcessServerStartupStatus(pDnode, pRpc);
return; return;

View File

@ -606,9 +606,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
} }
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) { msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
@ -673,6 +672,12 @@ int32_t udfdOpenClientRpc() {
rpcInit.rfp = udfdRpcRfp; rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) { if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client"); fnError("failed to init dnode rpc client");
@ -765,7 +770,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
} }
void udfdHandleRequest(SUdfdUvConn *conn) { void udfdHandleRequest(SUdfdUvConn *conn) {
char *inputBuf = conn->inputBuf; char *inputBuf = conn->inputBuf;
int32_t inputLen = conn->inputLen; int32_t inputLen = conn->inputLen;
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
@ -784,7 +789,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void udfdPipeCloseCb(uv_handle_t *pipe) { void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data; SUdfdUvConn *conn = pipe->data;
SUvUdfWork* pWork = conn->pWorkList; SUvUdfWork *pWork = conn->pWorkList;
while (pWork != NULL) { while (pWork != NULL) {
pWork->conn = NULL; pWork->conn = NULL;
pWork = pWork->pWorkNext; pWork = pWork->pWorkNext;