add perf
This commit is contained in:
parent
41067c55b3
commit
8c1f51c9e3
|
@ -125,6 +125,7 @@ typedef struct SRpcInit {
|
||||||
int32_t timeToGetConn;
|
int32_t timeToGetConn;
|
||||||
int8_t supportBatch; // 0: no batch, 1. batch
|
int8_t supportBatch; // 0: no batch, 1. batch
|
||||||
int32_t batchSize;
|
int32_t batchSize;
|
||||||
|
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
|
||||||
void *parent;
|
void *parent;
|
||||||
} SRpcInit;
|
} SRpcInit;
|
||||||
|
|
||||||
|
|
|
@ -387,6 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
rpcInit.batchSize = 8 * 1024;
|
rpcInit.batchSize = 8 * 1024;
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
|
rpcInit.notWaitAvaliableConn = 1;
|
||||||
|
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,8 @@ typedef struct {
|
||||||
int32_t failFastThreshold;
|
int32_t failFastThreshold;
|
||||||
int32_t failFastInterval;
|
int32_t failFastInterval;
|
||||||
|
|
||||||
|
int8_t notWaitAvaliableConn; // 1: no delay, 0: delay
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
bool (*retry)(int32_t code, tmsg_t msgType);
|
bool (*retry)(int32_t code, tmsg_t msgType);
|
||||||
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
||||||
|
|
|
@ -102,6 +102,8 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
if (pRpc->timeToGetConn == 0) {
|
if (pRpc->timeToGetConn == 0) {
|
||||||
pRpc->timeToGetConn = 10 * 1000;
|
pRpc->timeToGetConn = 10 * 1000;
|
||||||
}
|
}
|
||||||
|
pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
|
||||||
|
|
||||||
pRpc->tcphandle =
|
pRpc->tcphandle =
|
||||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
|
||||||
|
|
|
@ -708,7 +708,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
SMsgList* list = plist->list;
|
SMsgList* list = plist->list;
|
||||||
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
||||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||||
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
|
if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) {
|
||||||
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
|
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
|
||||||
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
|
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
|
||||||
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
|
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
|
||||||
|
|
Loading…
Reference in New Issue