Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 17:30:57 +08:00
parent 1a70cc243a
commit c345383d5a
5 changed files with 19 additions and 11 deletions

View File

@ -131,6 +131,7 @@ typedef struct SRpcInit {
int32_t batchSize;
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t startReadTimer;
void *parent;
} SRpcInit;

View File

@ -109,7 +109,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t svrVer = 0;
(void)taosVersionStrToInt(version, &svrVer);
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer, pRpc->info.conn.clientIp);
dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer,
pRpc->info.conn.clientIp);
goto _OVER;
}
@ -391,7 +392,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.notWaitAvaliableConn = 0;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
rpcInit.startReadTimer = 1;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client since:%s", tstrerror(terrno));
@ -434,11 +435,12 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->statusRpc = rpcOpen(&rpcInit);
if (pTrans->statusRpc == NULL) {
dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
dError("failed to init dnode rpc status client since %s", tstrerror(terrno));
return terrno;
}
@ -480,7 +482,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
rpcInit.startReadTimer = 1;
pTrans->syncRpc = rpcOpen(&rpcInit);
if (pTrans->syncRpc == NULL) {
dError("failed to init dnode rpc sync client since %s", tstrerror(terrno));

View File

@ -78,6 +78,7 @@ typedef struct {
void* tcphandle; // returned handle from TCP initialization
int64_t refId;
int8_t shareConn;
int8_t startReadTimer;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -42,6 +42,8 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pRpc == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pRpc->startReadTimer = pInit->startReadTimer;
if (pInit->label) {
int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
memcpy(pRpc->label, pInit->label, len);

View File

@ -365,7 +365,12 @@ void cliResetConnTimer(SCliConn* conn) {
}
}
void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) {
void cliConnMayUpdateTimer(SCliConn* conn, int timeout) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
if (pInst->startReadTimer == 0) {
return;
}
if (conn->timer != NULL) {
// reset previous timer
cliResetConnTimer(conn);
@ -374,9 +379,6 @@ void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) {
if (reqsSentNum == 0) {
return;
}
cliConnCheckTimoutMsg(conn);
if (conn->timer == NULL) {
if (cliGetConnTimer(conn->hostThrd, conn) != 0) {
return;
@ -610,7 +612,7 @@ void cliHandleResp(SCliConn* conn) {
return;
}
cliConnMaySetReadTimeout(conn, READ_TIMEOUT);
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
@ -656,7 +658,7 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType);
tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pReq->msg.msgType));
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
@ -1151,7 +1153,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
return;
}
cliConnMaySetReadTimeout(conn, READ_TIMEOUT);
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);