fix: limit session num

This commit is contained in:
yihaoDeng 2023-02-24 20:39:25 +08:00
parent 0bbd729995
commit dd2e9697b5
5 changed files with 134 additions and 32 deletions

View File

@ -23,12 +23,12 @@
#include "scheduler.h" #include "scheduler.h"
#include "tcache.h" #include "tcache.h"
#include "tglobal.h" #include "tglobal.h"
#include "thttp.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "ttime.h" #include "ttime.h"
#include "thttp.h"
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
@ -80,7 +80,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (pRequest->pQuery && pRequest->pQuery->pRoot) { if (pRequest->pQuery && pRequest->pQuery->pRoot) {
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pRequest->pQuery->pRoot)->sqlNodeType)) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type &&
(0 == ((SVnodeModifOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us", "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
@ -154,6 +155,11 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to server"); tscError("failed to init connection to server");
@ -488,7 +494,6 @@ void tscStopCrashReport() {
} }
} }
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) { void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
char *pMsg = NULL; char *pMsg = NULL;
const char *flags = "UTL FATAL "; const char *flags = "UTL FATAL ";
@ -507,7 +512,6 @@ void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
} }
void taos_init_imp(void) { void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet. // In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.

View File

@ -2008,6 +2008,11 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {
tscError("failed to init server status client"); tscError("failed to init server status client");

View File

@ -42,6 +42,7 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 2000; int32_t tsNumOfRpcSessions = 2000;
int32_t tsTimeToGetAvailableConn = 1000;
int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4;
@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 10) { if (tsNumOfTaskQueueThreads >= 10) {
@ -395,6 +402,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
@ -515,6 +525,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsTimeToGetAvailableConn = 1000;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
pItem->i32 = tsTimeToGetAvailableConn;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCommitThreads = numOfCores / 2; tsNumOfCommitThreads = numOfCores / 2;
@ -696,6 +714,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
return 0; return 0;
} }
@ -732,7 +754,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsNumOfRpcSessions = cfgGetItem(pCfg, "numofrpcsessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
@ -740,7 +764,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32;
tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;

View File

@ -147,6 +147,7 @@ typedef struct {
int8_t epsetRetryCnt; int8_t epsetRetryCnt;
int32_t retryCode; int32_t retryCode;
void* task;
int hThrdIdx; int hThrdIdx;
} STransConnCtx; } STransConnCtx;

View File

@ -105,6 +105,7 @@ typedef struct SCliThrd {
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
SDelayQueue* delayQueue; SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue; SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
@ -614,8 +615,9 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) {
queue* h = QUEUE_HEAD(&(*msglist)->msgQ); queue* h = QUEUE_HEAD(&(*msglist)->msgQ);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg); transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn); cliSend(conn);
@ -1218,15 +1220,53 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
} }
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0};
transMsg.contLen = 0;
transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId;
transMsg.info.hasEpSet = false;
if (pCtx->pSem != NULL) {
if (pCtx->pRsp == NULL) {
} else {
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
}
} else {
pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
}
destroyCmsg(pMsg);
}
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
pThrd->stopMsg = pMsg; pThrd->stopMsg = pMsg;
return; return;
} }
void** pIter = taosHashIterate(pThrd->connLimitCache, NULL);
while (pIter != NULL) {
SMsgList* list = (SMsgList*)(*pIter);
while (!QUEUE_IS_EMPTY(&list->msgQ)) {
queue* h = QUEUE_HEAD(&list->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
doNotifyApp(pMsg, pThrd);
}
pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter);
}
pThrd->stopMsg = NULL; pThrd->stopMsg = NULL;
pThrd->quit = true; pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
uv_walk(pThrd->loop, cliWalkCb, NULL); uv_walk(pThrd->loop, cliWalkCb, NULL);
} }
@ -1353,7 +1393,19 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) {
} }
return 0; return 0;
} }
static void doFreeTimeoutMsg(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst;
QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd);
taosMemoryFree(arg);
}
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
@ -1363,6 +1415,15 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs
} }
if ((*list)->numOfConn >= pTransInst->connLimitNum) { if ((*list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &pMsg->msg.info.traceId;
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q);
return -1; return -1;
} }
@ -1846,6 +1907,8 @@ static SCliThrd* createThrdObj(void* trans) {
transDQCreate(pThrd->loop, &pThrd->timeoutQueue); transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
transDQCreate(pThrd->loop, &pThrd->waitConnQueue);
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans; pThrd->pTransInst = trans;
@ -1872,6 +1935,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid); tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
@ -1910,6 +1974,10 @@ static void destroyThrdObj(SCliThrd* pThrd) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
destroyCmsg(pMsg); destroyCmsg(pMsg);
} }
taosMemoryFree(list); taosMemoryFree(list);