opt: opt tag index

This commit is contained in:
yihaoDeng 2023-02-15 17:26:11 +08:00
parent 009b6a61fa
commit e44704b20e
6 changed files with 25 additions and 1 deletions

View File

@ -112,6 +112,8 @@ typedef struct SRpcInit {
// fail fast fp // fail fast fp
RpcFFfp ffp; RpcFFfp ffp;
int32_t connLimit;
void *parent; void *parent;
} SRpcInit; } SRpcInit;

View File

@ -284,6 +284,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
rpcInit.connLimit = 7500;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client"); dError("failed to init dnode rpc client");

View File

@ -64,6 +64,7 @@ typedef struct {
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType); bool (*failFastFp)(tmsg_t msgType);
int32_t connLimit;
int index; int index;
void* parent; void* parent;
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization

View File

@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp; pRpc->failFastFp = pInit->ffp;
pRpc->connLimit = pInit->connLimit;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) { if (pRpc->numOfThreads <= 0) {

View File

@ -80,6 +80,7 @@ typedef struct SCliThrd {
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
int connCount;
void (*destroyAhandleFp)(void* ahandle); void (*destroyAhandleFp)(void* ahandle);
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
@ -671,7 +672,6 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
// transSetConnOption((uv_tcp_t*)conn->stream);
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
@ -694,6 +694,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn->broken = 0; conn->broken = 0;
transRefCliHandle(conn); transRefCliHandle(conn);
atomic_add_fetch_32(&pThrd->connCount, 1);
allocConnRef(conn, false); allocConnRef(conn, false);
return conn; return conn;
@ -738,6 +739,8 @@ static void cliDestroy(uv_handle_t* handle) {
conn->timer = NULL; conn->timer = NULL;
} }
atomic_sub_fetch_32(&pThrd->connCount, 1);
transReleaseExHandle(transGetRefMgt(), conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
@ -1861,6 +1864,13 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
// read only
if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_MAX_SESSIONS;
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
@ -1902,6 +1912,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
// not limit sync req
// read only
// if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) {
// transFreeMsg(pReq->pCont);
// transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
// return TSDB_CODE_RPC_MAX_SESSIONS;
//}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0); tsem_init(sem, 0, 0);

View File

@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")