commit
65b07fd157
|
@ -247,6 +247,9 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
if (pRpc->connType == TAOS_CONN_CLIENT) {
|
if (pRpc->connType == TAOS_CONN_CLIENT) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads;
|
||||||
|
if (pRpc->numOfThreads >= 10) {
|
||||||
|
pRpc->numOfThreads = 10;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
}
|
}
|
||||||
|
@ -769,8 +772,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
|
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
|
||||||
tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, sid,
|
tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid,
|
||||||
hashstr, pConn->spi);
|
sid, hashstr, pConn->spi);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
|
|
|
@ -105,24 +105,23 @@ static void* clientThread(void* arg);
|
||||||
static void clientHandleResp(SCliConn* conn) {
|
static void clientHandleResp(SCliConn* conn) {
|
||||||
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
|
||||||
SRpcInfo* pRpc = pCtx->pTransInst;
|
SRpcInfo* pRpc = pCtx->pTransInst;
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||||
rpcMsg.pCont = transContFromHead(pHead);
|
rpcMsg.pCont = transContFromHead(pHead);
|
||||||
rpcMsg.code = pHead->code;
|
rpcMsg.code = pHead->code;
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
|
|
||||||
// rpcMsg.pCont = conn->readBuf.buf;
|
|
||||||
// rpcMsg.contLen = conn->readBuf.len;
|
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
|
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
conn->notifyCount += 1;
|
conn->notifyCount += 1;
|
||||||
|
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
|
tfree(conn->data);
|
||||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||||
|
|
||||||
// start thread's timer of conn pool if not active
|
// start thread's timer of conn pool if not active
|
||||||
|
@ -145,7 +144,7 @@ static void clientHandleExcept(SCliConn* pConn) {
|
||||||
rpcMsg.code = -1;
|
rpcMsg.code = -1;
|
||||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
|
tfree(pConn->data);
|
||||||
pConn->notifyCount += 1;
|
pConn->notifyCount += 1;
|
||||||
destroyTransConnCtx(pCtx);
|
destroyTransConnCtx(pCtx);
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
|
|
Loading…
Reference in New Issue