add mem check

This commit is contained in:
yihaoDeng 2024-08-09 10:40:38 +08:00
parent 2af86c961a
commit 3eaae2ab75
1 changed files with 28 additions and 28 deletions

View File

@ -1164,7 +1164,7 @@ void cliSendBatch(SCliConn* pConn) {
goto _except; goto _except;
} }
int i = 0; int i = 0;
queue* h = NULL; queue* h = NULL;
QUEUE_FOREACH(h, &pBatch->wq) { QUEUE_FOREACH(h, &pBatch->wq) {
SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);
@ -1215,8 +1215,7 @@ void cliSendBatch(SCliConn* pConn) {
if (req == NULL) { if (req == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
taosMemoryFree(wb); goto _except;
return;
} }
req->data = pConn; req->data = pConn;
tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
@ -1225,12 +1224,15 @@ void cliSendBatch(SCliConn* pConn) {
code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
if (code != 0) { if (code != 0) {
tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code)); tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code));
goto _except;
} }
taosMemoryFree(wb); taosMemoryFree(wb);
return; return;
_except: _except:
cliDestroyBatch(pBatch); cliDestroyBatch(pBatch);
taosMemoryFree(wb);
pConn->pBatch = NULL; pConn->pBatch = NULL;
return; return;
} }
@ -2033,33 +2035,31 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1; (*ppBatchList)->len += 1;
continue;
}
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pMsg->msg.contLen;
pBatch->wLen += 1;
} else { } else {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
if (pBatch == NULL) { SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
destroyCmsg(pMsg); if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
continue; QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pMsg->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
destroyCmsg(pMsg);
continue;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
} }
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
} }
} }
continue; continue;