fix: control rpc qitem memory
This commit is contained in:
parent
0c6abb71c8
commit
50b66df39f
|
@ -65,6 +65,7 @@ typedef struct STaosQnode {
|
|||
STaosQnode *next;
|
||||
STaosQueue *queue;
|
||||
int64_t timestamp;
|
||||
int32_t dataSize;
|
||||
int32_t size;
|
||||
int8_t itype;
|
||||
int8_t reserved[3];
|
||||
|
@ -103,7 +104,7 @@ typedef struct STaosQall {
|
|||
STaosQueue *taosOpenQueue();
|
||||
void taosCloseQueue(STaosQueue *queue);
|
||||
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype);
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize);
|
||||
void taosFreeQitem(void *pItem);
|
||||
void taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
|
|
|
@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) {
|
|||
int64_t refId = *(int64_t*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq != NULL) {
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
|
||||
*pTaskType = TMQ_DELAYED_TASK__ASK_EP;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
|||
int64_t refId = *(int64_t*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq != NULL) {
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
|
||||
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
|||
int64_t refId = *(int64_t*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq != NULL) {
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
|
||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
|
||||
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -1140,7 +1140,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
goto CREATE_MSG_FAIL;
|
||||
}
|
||||
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
||||
if (pRspWrapper == NULL) {
|
||||
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
|
||||
goto CREATE_MSG_FAIL;
|
||||
|
@ -1173,7 +1173,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
// handle meta rsp
|
||||
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
||||
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
||||
if (pRspWrapper == NULL) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
@ -1363,7 +1363,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tmqUpdateEp(tmq, head->epoch, &rsp);
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
} else {
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
||||
if (pWrapper == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
|
|
|
@ -159,7 +159,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
}
|
||||
|
||||
if (pWorker == NULL) return -1;
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) return -1;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
pRpc->pCont = NULL;
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) return -1;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
pRpc->pCont = NULL;
|
||||
|
|
|
@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) {
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
|
|
|
@ -233,7 +233,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) {
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
|
|
|
@ -141,7 +141,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
pRpc->info.wrapper = pWrapper;
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
|
||||
if (pMsg == NULL) goto _OVER;
|
||||
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
|
|
|
@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
|||
tb_uid_t suid) {
|
||||
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
|
||||
|
||||
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
|
||||
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM, 0);
|
||||
if (!qItem) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
|
@ -1265,7 +1265,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
|
||||
|
||||
if (!failed) {
|
||||
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM);
|
||||
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||
pRefBlock->pBlock = pDelBlock;
|
||||
pRefBlock->dataRef = pRef;
|
||||
|
@ -1297,7 +1297,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
}
|
||||
|
||||
#if 0
|
||||
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
|
||||
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
SSDataBlock block = {0};
|
||||
|
|
|
@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {
|
|||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM);
|
||||
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0);
|
||||
if (NULL == pBuf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
|||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
|
||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0);
|
||||
if (NULL == pBuf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
|
||||
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||
if (trigger == NULL) return;
|
||||
trigger->type = STREAM_INPUT__GET_RES;
|
||||
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
int8_t status;
|
||||
|
||||
// enqueue
|
||||
|
@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
|
|||
}
|
||||
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
// enqueue
|
||||
|
|
|
@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
|
|||
}
|
||||
|
||||
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
|
||||
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
|
||||
if (pDataSubmit == NULL) return NULL;
|
||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pDataSubmit->dataRef == NULL) goto FAIL;
|
||||
|
@ -81,7 +81,7 @@ FAIL:
|
|||
}
|
||||
|
||||
SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
|
||||
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
|
||||
if (pMerged == NULL) return NULL;
|
||||
pMerged->reqs = taosArrayInit(0, sizeof(void*));
|
||||
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
|
||||
|
@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit)
|
|||
}
|
||||
|
||||
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
|
||||
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
|
||||
if (pSubmitClone == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
taosArrayDestroy(pRes);
|
||||
break;
|
||||
}
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
if (qRes == NULL) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
qDebug("stream task %d exec end", pTask->taskId);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
if (qRes == NULL) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
streamFreeQitem(input);
|
||||
|
|
|
@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
STaosQueue *pMsgQ = gSyncIO->pMsgQ;
|
||||
|
@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
|
||||
SSyncIO *io = pParent;
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
taosWriteQitem(io->pMsgQ, pTemp);
|
||||
}
|
||||
|
@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pMsg, &rpcMsg);
|
||||
SRpcMsg *pTemp;
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
|
||||
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
|
||||
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
|
||||
taosWriteQitem(io->pMsgQ, pTemp);
|
||||
|
|
|
@ -128,7 +128,7 @@ void *processShellMsg(void *arg) {
|
|||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
int32_t idx = balance % multiQ->numOfThread;
|
||||
|
|
|
@ -109,20 +109,22 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
|
|||
return memOfItems;
|
||||
}
|
||||
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype) {
|
||||
void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) {
|
||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNode->dataSize = dataSize;
|
||||
pNode->size = size;
|
||||
pNode->itype = itype;
|
||||
pNode->timestamp = taosGetTimestampUs();
|
||||
|
||||
if (itype == RPC_QITEM) {
|
||||
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
|
||||
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
|
||||
if (alloced > tsRpcQueueMemoryAllowed) {
|
||||
uError("failed to alloc qitem, size:%d alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsRpcQueueMemoryUsed);
|
||||
taosMemoryFree(pNode);
|
||||
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
||||
return NULL;
|
||||
|
@ -139,8 +141,8 @@ void taosFreeQitem(void *pItem) {
|
|||
if (pItem == NULL) return;
|
||||
|
||||
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||
if (pNode->itype > 0) {
|
||||
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
|
||||
if (pNode->itype == RPC_QITEM) {
|
||||
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize);
|
||||
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
|
||||
} else {
|
||||
uTrace("item:%p, node:%p is freed", pItem, pNode);
|
||||
|
|
Loading…
Reference in New Issue