add mem check
This commit is contained in:
parent
a07adec754
commit
6be6027d17
|
@ -190,6 +190,7 @@ static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
|||
static void doFreeTimeoutMsg(void* param);
|
||||
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
|
||||
|
||||
static void cliDestroyBatch(SCliBatch* pBatch);
|
||||
// cli util func
|
||||
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
|
||||
|
@ -662,7 +663,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
|||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||
if (plist->list->numOfConn >= pTranInst->connLimitNum) {
|
||||
*exceed = true;
|
||||
return NULL;;
|
||||
return NULL;
|
||||
}
|
||||
plist->list->numOfConn++;
|
||||
return NULL;
|
||||
|
@ -708,7 +709,8 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
|||
SMsgList* list = plist->list;
|
||||
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||
if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) {
|
||||
if (pTransInst->notWaitAvaliableConn ||
|
||||
(pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) {
|
||||
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
|
||||
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
|
||||
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
|
||||
|
@ -903,11 +905,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
|||
exh->handle = conn;
|
||||
exh->pThrd = conn->hostThrd;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
|
||||
|
||||
conn->refId = exh->refId;
|
||||
taosWUnLockLatch(&exh->latch);
|
||||
|
||||
tDebug("conn %p specified by %"PRId64"", conn, handle);
|
||||
tDebug("conn %p specified by %" PRId64 "", conn, handle);
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||
return 0;
|
||||
|
@ -919,7 +921,6 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_
|
|||
int32_t code = transAllocBuffer(pBuf, buf);
|
||||
if (code < 0) {
|
||||
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
|
||||
// cliDestroyConn(conn, true);
|
||||
}
|
||||
}
|
||||
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||
|
@ -1081,9 +1082,8 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
(void)atomic_sub_fetch_32(&pThrd->connCount, 1);
|
||||
|
||||
if (conn->refId > 0) {
|
||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
}
|
||||
taosMemoryFree(conn->dstAddr);
|
||||
taosMemoryFree(conn->stream);
|
||||
|
@ -1149,6 +1149,7 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
(void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||
}
|
||||
void cliSendBatch(SCliConn* pConn) {
|
||||
int32_t code = 0;
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
|
@ -1158,8 +1159,13 @@ void cliSendBatch(SCliConn* pConn) {
|
|||
pBatch->pList->connCnt += 1;
|
||||
|
||||
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
|
||||
int i = 0;
|
||||
if (wb == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
|
||||
goto _except;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
queue* h = NULL;
|
||||
QUEUE_FOREACH(h, &pBatch->wq) {
|
||||
SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
@ -1169,6 +1175,11 @@ void cliSendBatch(SCliConn* pConn) {
|
|||
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
|
||||
if (pMsg->pCont == 0) {
|
||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||
if (pMsg->pCont == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_BUFFER;
|
||||
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
|
||||
goto _except;
|
||||
}
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
|
||||
|
@ -1202,11 +1213,27 @@ void cliSendBatch(SCliConn* pConn) {
|
|||
}
|
||||
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
if (req == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
|
||||
taosMemoryFree(wb);
|
||||
return;
|
||||
}
|
||||
req->data = pConn;
|
||||
tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
pBatch->wLen, pBatch->batchSize);
|
||||
(void)uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
|
||||
|
||||
code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
|
||||
if (code != 0) {
|
||||
tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code));
|
||||
}
|
||||
taosMemoryFree(wb);
|
||||
return;
|
||||
|
||||
_except:
|
||||
cliDestroyBatch(pBatch);
|
||||
pConn->pBatch = NULL;
|
||||
return;
|
||||
}
|
||||
void cliSend(SCliConn* pConn) {
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
|
@ -1626,7 +1653,7 @@ static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
}
|
||||
return;
|
||||
_exception:
|
||||
tDebug("already free conn %p by id %" PRId64"", conn, refId);
|
||||
tDebug("already free conn %p by id %" PRId64 "", conn, refId);
|
||||
|
||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
||||
destroyCmsg(pMsg);
|
||||
|
@ -1920,6 +1947,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
|
|||
|
||||
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
||||
STrans* pInst = pThrd->pTransInst;
|
||||
int32_t code = 0;
|
||||
|
||||
int count = 0;
|
||||
while (!QUEUE_IS_EMPTY(wq)) {
|
||||
|
@ -1944,6 +1972,11 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
|
||||
if (ppBatchList == NULL || *ppBatchList == NULL) {
|
||||
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
||||
if (pBatchList == NULL) {
|
||||
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
destroyCmsg(pMsg);
|
||||
continue;
|
||||
}
|
||||
QUEUE_INIT(&pBatchList->wq);
|
||||
pBatchList->connMax = pInst->connLimitNum;
|
||||
pBatchList->connCnt = 0;
|
||||
|
@ -1952,23 +1985,45 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
|
||||
pBatchList->ip = taosStrdup(ip);
|
||||
pBatchList->dst = taosStrdup(key);
|
||||
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
|
||||
taosMemoryFree(pBatchList->ip);
|
||||
taosMemoryFree(pBatchList->dst);
|
||||
taosMemoryFree(pBatchList);
|
||||
destroyCmsg(pMsg);
|
||||
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
continue;
|
||||
}
|
||||
pBatchList->port = port;
|
||||
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
QUEUE_INIT(&pBatch->wq);
|
||||
QUEUE_INIT(&pBatch->listq);
|
||||
if (pBatch == NULL) {
|
||||
tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
destroyCmsg(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
QUEUE_PUSH(&pBatch->wq, h);
|
||||
pBatch->wLen += 1;
|
||||
pBatch->batchSize += pMsg->msg.contLen;
|
||||
pBatch->pList = pBatchList;
|
||||
|
||||
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
|
||||
|
||||
(void)taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
|
||||
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pBatch);
|
||||
taosMemoryFree(pBatchList->ip);
|
||||
taosMemoryFree(pBatchList->dst);
|
||||
taosMemoryFree(pBatchList);
|
||||
tError("failed to put batch list to cache, reason:%s", tstrerror(code));
|
||||
destroyCmsg(pMsg);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
if (pBatch == NULL) {
|
||||
destroyCmsg(pMsg);
|
||||
continue;
|
||||
}
|
||||
QUEUE_INIT(&pBatch->wq);
|
||||
QUEUE_INIT(&pBatch->listq);
|
||||
|
||||
|
@ -1991,6 +2046,11 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
pBatch->wLen += 1;
|
||||
} else {
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
if (pBatch == NULL) {
|
||||
destroyCmsg(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
QUEUE_INIT(&pBatch->wq);
|
||||
QUEUE_INIT(&pBatch->listq);
|
||||
|
||||
|
@ -3269,7 +3329,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) {
|
|||
return TSDB_CODE_RPC_MODULE_QUIT;
|
||||
}
|
||||
if (transpointId == 0) {
|
||||
tDebug("not free by refId:%"PRId64"", transpointId);
|
||||
tDebug("not free by refId:%" PRId64 "", transpointId);
|
||||
TAOS_CHECK_GOTO(0, NULL, _exception);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ int32_t transCompressMsg(char* msg, int32_t len) {
|
|||
|
||||
char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes
|
||||
if (buf == NULL) {
|
||||
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
||||
tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
||||
ret = len;
|
||||
return ret;
|
||||
}
|
||||
|
@ -206,6 +206,8 @@ int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
|||
p->cap = p->left + p->len;
|
||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||
if (p->buf == NULL) {
|
||||
uvBuf->base = NULL;
|
||||
uvBuf->len = 0;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
uvBuf->base = p->buf + p->len;
|
||||
|
@ -439,7 +441,10 @@ void transReqQueueClear(queue* q) {
|
|||
}
|
||||
|
||||
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
|
||||
queue->q = taosArrayInit(2, sizeof(void*));
|
||||
queue->q = taosArrayInit(4, sizeof(void*));
|
||||
if (taosArrayReserve(queue->q, 4) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
queue->freeFunc = (void (*)(const void*))freeFunc;
|
||||
|
||||
if (queue->q == NULL) {
|
||||
|
|
|
@ -205,7 +205,6 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
|||
int32_t code = transAllocBuffer(pBuf, buf);
|
||||
if (code < 0) {
|
||||
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
|
||||
// destroyConn(conn, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -542,6 +541,9 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
buf->len = 2;
|
||||
buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len);
|
||||
if (buf == NULL) {
|
||||
tError("failed to alloc conn read buffer since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
}
|
||||
}
|
||||
|
||||
void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||
|
|
Loading…
Reference in New Issue