fix: oom in rpc queue
This commit is contained in:
parent
9d015722c3
commit
79f1e90743
|
@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
|
||||||
|
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
pRpc->pCont = NULL;
|
pRpc->pCont = NULL;
|
||||||
|
|
|
@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRpc->info.wrapper = pWrapper;
|
pRpc->info.wrapper = pWrapper;
|
||||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
|
||||||
|
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // resp msg is not limited by tsRpcQueueMemoryUsed
|
||||||
|
pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen);
|
||||||
if (pMsg == NULL) goto _OVER;
|
if (pMsg == NULL) goto _OVER;
|
||||||
|
|
||||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||||
|
|
|
@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
qall->memOfItems = queue->memOfItems;
|
qall->memOfItems = queue->memOfItems;
|
||||||
|
qall->unAccessedNumOfItems = queue->numOfItems;
|
||||||
|
qall->unAccessMemOfItems = queue->memOfItems;
|
||||||
|
|
||||||
code = qall->numOfItems;
|
code = qall->numOfItems;
|
||||||
qinfo->ahandle = queue->ahandle;
|
qinfo->ahandle = queue->ahandle;
|
||||||
|
|
Loading…
Reference in New Issue