Merge pull request #26418 from taosdata/fix/TD-30736-3.0a
fix: oom in rpc queue
This commit is contained in:
commit
854f52dd0a
|
@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
|
||||||
|
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
pRpc->pCont = NULL;
|
pRpc->pCont = NULL;
|
||||||
|
|
|
@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRpc->info.wrapper = pWrapper;
|
pRpc->info.wrapper = pWrapper;
|
||||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
|
||||||
|
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed
|
||||||
|
pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||||
if (pMsg == NULL) goto _OVER;
|
if (pMsg == NULL) goto _OVER;
|
||||||
|
|
||||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||||
|
|
|
@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
|
||||||
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
|
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
|
||||||
if (alloced > tsRpcQueueMemoryAllowed) {
|
if (alloced > tsRpcQueueMemoryAllowed) {
|
||||||
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
|
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
|
||||||
tsRpcQueueMemoryUsed);
|
tsRpcQueueMemoryAllowed);
|
||||||
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
|
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
|
||||||
taosMemoryFree(pNode);
|
taosMemoryFree(pNode);
|
||||||
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
||||||
|
@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->memOfItems = queue->memOfItems;
|
qall->memOfItems = queue->memOfItems;
|
||||||
|
qall->unAccessedNumOfItems = queue->numOfItems;
|
||||||
|
qall->unAccessMemOfItems = queue->memOfItems;
|
||||||
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
qinfo->ahandle = queue->ahandle;
|
qinfo->ahandle = queue->ahandle;
|
||||||
|
|
Loading…
Reference in New Issue