diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 8b46bbd064..1f6b205cdf 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -65,6 +65,7 @@ typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; int64_t timestamp; + int64_t dataSize; int32_t size; int8_t itype; int8_t reserved[3]; @@ -103,7 +104,7 @@ typedef struct STaosQall { STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); -void *taosAllocateQitem(int32_t size, EQItype itype); +void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize); void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e6584f4a00..e26512d065 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -190,8 +190,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, (*pRequest)->body.param = param; STscObj* pTscObj = (*pRequest)->pTscObj; - if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, - sizeof((*pRequest)->self))) { + int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, + sizeof((*pRequest)->self)); + if (err) { tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4352ec69d3..939acd2b6e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__ASK_EP; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__COMMIT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__REPORT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -1171,7 +1171,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { goto CREATE_MSG_FAIL; } if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); goto CREATE_MSG_FAIL; @@ -1204,7 +1204,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -1394,7 +1394,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 857fbcbce5..095857825d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -159,12 +159,12 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } if (pWorker == NULL) return -1; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 3e5ad65db7..28da0f9c5f 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -58,19 +58,19 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, is created and will put into qnode-query queue", pMsg); + dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen); taosWriteQitem(pMgmt->queryWorker.queue, pMsg); return 0; case READ_QUEUE: case FETCH_QUEUE: - dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); + dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); return 0; default: diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 2e66aeae37..9bd5be5201 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) { } int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; @@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SSnode *pSnode = pMgmt->pSnode; if (pSnode == NULL) { - dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), - TMSG_INFO(pMsg->msgType), qtype); + dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(), + TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 08ea880b97..7e3915f3d1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -233,7 +233,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; @@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } SMsgHead *pHead = pRpc->pCont; - dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen); pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 99f8bd002a..4fa09a46b7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -141,11 +141,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pRpc->info.wrapper = pWrapper; - pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); + dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen); code = dmProcessNodeMsg(pWrapper, pMsg); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fe76ba31d7..ffc357b2e8 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -769,7 +769,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p } int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); - for (int32_t i = 0; i < numOfQueries; ++i) { + for (int32_t i = 0; i < numOfQueries && numOfRows < rows; ++i) { SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); cols = 0; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6bd2ae3435..ba99e5515d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu tb_uid_t suid) { const SSubmitReq *pReq = (const SSubmitReq *)pMsg; - void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); + void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM, 0); if (!qItem) { return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 389c8013f9..09717c823e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1271,7 +1271,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); if (!failed) { - SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM); + SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; pRefBlock->dataRef = pRef; @@ -1303,7 +1303,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } #if 0 - SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); SSDataBlock block = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 01fbcf657f..ad264b8bb3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -121,7 +121,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { return &pInfo->blockData[1]; } - if (pIter->pSttBlk == NULL) { + if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) { return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 58fa24ad41..139347fd69 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2790,7 +2790,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; - bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + + bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); if (!hasNexTable) { @@ -3823,6 +3824,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. + // no valid error code set in metaGetTbTSchema, so let's set the error code here. + // we should proceed in case of tmq processing. if (pCond->suid != 0) { pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1); if (pReader->pSchema == NULL) { @@ -3840,7 +3843,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); } - STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader; + STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader; pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(p); @@ -3888,10 +3891,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; - _err: +_err: tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); + tsdbReaderClose(pReader); return code; - } +} void tsdbReaderClose(STsdbReader* pReader) { if (pReader == NULL) { diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 12fb449238..eff7a5ef93 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); + SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index c45226e02b..c2fa438c80 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM); + SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f7bee7d351..a82dc13318 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -91,11 +91,15 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); + + if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) { + continue; + } + // null value is represented by using key = INT64_MIN for now. // TODO: optimize to ignore null values for linear interpolation. if (!pLinearInfo->isStartSet) { if (!colDataIsNull_s(pColInfoData, rowIndex)) { - ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type)); pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); char* p = colDataGetData(pColInfoData, rowIndex); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 79549675a3..5b542dd54b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) { } if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); + SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); if (trigger == NULL) return; trigger->type = STREAM_INPUT__GET_RES; trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { } int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status; // enqueue @@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status = TASK_INPUT_STATUS__NORMAL; // enqueue diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 2fea5b9eca..6cc684dddf 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0); if (pDataSubmit == NULL) return NULL; pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); if (pDataSubmit->dataRef == NULL) goto FAIL; @@ -81,7 +81,7 @@ FAIL: } SStreamMergedSubmit* streamMergedSubmitNew() { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) return NULL; pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); @@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) } SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { - SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0); if (pSubmitClone == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d20c2902f5..20608a6cf3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { taosArrayDestroy(pRes); break; } - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("stream task %d exec end", pTask->taskId); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); streamFreeQitem(input); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 415fecd9f2..4b305f823f 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { syncRpcMsgLog2(logBuf, pMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); STaosQueue *pMsgQ = gSyncIO->pMsgQ; @@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); SSyncIO *io = pParent; SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); taosWriteQitem(io->pMsgQ, pTemp); } @@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); taosWriteQitem(io->pMsgQ, pTemp); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 464559c1e0..4e2395b17b 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -128,7 +128,7 @@ void *processShellMsg(void *arg) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); int32_t idx = balance % multiQ->numOfThread; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 0f992184b5..42b6358893 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -109,20 +109,24 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { return memOfItems; } -void *taosAllocateQitem(int32_t size, EQItype itype) { +void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pNode->dataSize = dataSize; pNode->size = size; pNode->itype = itype; pNode->timestamp = taosGetTimestampUs(); if (itype == RPC_QITEM) { - int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size); + int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); if (alloced > tsRpcQueueMemoryAllowed) { + uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, + tsRpcQueueMemoryUsed); + atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); taosMemoryFree(pNode); terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; return NULL; @@ -139,8 +143,8 @@ void taosFreeQitem(void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); - if (pNode->itype > 0) { - int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size); + if (pNode->itype == RPC_QITEM) { + int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize); uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); } else { uTrace("item:%p, node:%p is freed", pItem, pNode);