Merge remote-tracking branch 'origin/3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-29 08:24:50 +08:00
parent 88222b5e86
commit 22f0d9b793
1 changed files with 16 additions and 7 deletions

View File

@ -1238,6 +1238,7 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
return true; return true;
} }
int32_t cliBatchSend(SCliConn* pConn) { int32_t cliBatchSend(SCliConn* pConn) {
int32_t code = 0;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
@ -1257,10 +1258,12 @@ int32_t cliBatchSend(SCliConn* pConn) {
} }
uv_buf_t* wb = NULL; uv_buf_t* wb = NULL;
if (pConn->bufSize < size) { if (pConn->bufSize < size) {
pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t)); uv_buf_t* twb = (uv_buf_t*)taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t));
if (twb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pConn->buf = twb;
pConn->bufSize = size; pConn->bufSize = size;
taosMemoryFree(wb);
return TSDB_CODE_OUT_OF_MEMORY;
} }
wb = pConn->buf; wb = pConn->buf;
@ -1275,6 +1278,9 @@ int32_t cliBatchSend(SCliConn* pConn) {
STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); STransMsg* pReq = (STransMsg*)(&pCliMsg->msg);
if (pReq->pCont == 0) { if (pReq->pCont == 0) {
pReq->pCont = (void*)rpcMallocCont(0); pReq->pCont = (void*)rpcMallocCont(0);
if (pReq->pCont == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->contLen = 0; pReq->contLen = 0;
} }
@ -1327,17 +1333,16 @@ int32_t cliBatchSend(SCliConn* pConn) {
if (ret != 0) { if (ret != 0) {
tError("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); tError("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
freeWReqToWQ(&pConn->wq, req->data); freeWReqToWQ(&pConn->wq, req->data);
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_UNUSED(transUnrefCliHandle(pConn)); TAOS_UNUSED(transUnrefCliHandle(pConn));
} }
return 0; return code;
} }
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqsToSend, &pCliMsg->q); transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
code = cliBatchSend(pConn); return cliBatchSend(pConn);
return code;
} }
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
@ -1786,6 +1791,10 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
code = cliHandleState_mayUpdateState(pConn, pReq); code = cliHandleState_mayUpdateState(pConn, pReq);
} }
code = cliSendReq(pConn, pReq); code = cliSendReq(pConn, pReq);
if (code != 0) {
tWarn("%s conn %p failed to send req since %s", pInst->label, pConn, tstrerror(code));
TAOS_UNUSED(transUnrefCliHandle(pConn));
}
tTrace("%s conn %p ready", pInst->label, pConn); tTrace("%s conn %p ready", pInst->label, pConn);
return; return;