diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 130de8c030..6aa9f1bb5e 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -29,6 +29,9 @@ extern "C" { #define DS_BUF_FULL 2 #define DS_BUF_EMPTY 3 +#define DS_FLAG_USE_MEMPOOL (1 << 0) + + struct SSDataBlock; typedef struct SDeleterRes { @@ -131,6 +134,9 @@ void dsScheduleProcess(void* ahandle, void* pItem); */ void dsDestroyDataSinker(DataSinkHandle handle); +int32_t dsGetSinkFlags(DataSinkHandle handle, uint64_t* pFlags); + + #ifdef __cplusplus } #endif diff --git a/include/os/os.h b/include/os/os.h index ac1a750b78..67e9150464 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -109,6 +109,7 @@ extern "C" { #include "osLz4.h" #include "osMath.h" #include "osMemory.h" +#include "osMemPool.h" #include "osRand.h" #include "osSemaphore.h" #include "osSignal.h" diff --git a/include/util/tmempool.h b/include/os/osMemPool.h similarity index 92% rename from include/util/tmempool.h rename to include/os/osMemPool.h index d6fe6e5ca2..5507e99597 100644 --- a/include/util/tmempool.h +++ b/include/os/osMemPool.h @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#ifndef _TD_UTIL_MEMPOOL_H_ -#define _TD_UTIL_MEMPOOL_H_ +#ifndef _TD_OS_MEMPOOL_H_ +#define _TD_OS_MEMPOOL_H_ #include "os.h" @@ -50,6 +50,14 @@ void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo); void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); +void taosAutoMemoryFree(void *ptr); +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession); +void taosMemPoolDestroySession(void* session); + + +extern threadlocal void* threadPoolHandle; +extern threadlocal void* threadPoolSession; + #define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0) #define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL) @@ -76,4 +84,4 @@ void taosMemPoolModDestroy(void); } #endif -#endif /*_TD_UTIL_MEMPOOL_H_*/ +#endif /*_TD_OS_MEMPOOL_H_*/ diff --git a/include/os/osMemory.h b/include/os/osMemory.h index e43e7d7cf6..71adb2bedc 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -36,11 +36,6 @@ extern "C" { #endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) -// #define taosMemoryMalloc malloc -// #define taosMemoryCalloc calloc -// #define taosMemoryRealloc realloc -// #define taosMemoryFree free - int32_t taosMemoryDbgInit(); int32_t taosMemoryDbgInitRestore(); void *taosMemMalloc(int64_t size); @@ -61,6 +56,7 @@ void *taosMemMallocAlign(uint32_t alignment, int64_t size); } \ } while (0) +#include "osMemPool.h" #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 19b6655af1..01ebdec173 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -1215,7 +1215,7 @@ static void *hbThreadFunc(void *param) { pInfo->msgType = TDMT_MND_HEARTBEAT; pInfo->param = taosMemoryMalloc(sizeof(int32_t)); *(int32_t *)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; + pInfo->paramFreeFp = taosMemFree; pInfo->requestId = generateRequestId(); pInfo->requestObjRefId = 0; diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 9e990dd545..a7248405e5 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -124,7 +124,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO pInfo->msgType = TDMT_MND_STATIS; // pInfo->param = taosMemoryMalloc(sizeof(int32_t)); // *(int32_t*)pInfo->param = i; - pInfo->paramFreeFp = taosMemoryFree; + pInfo->paramFreeFp = taosMemFree; pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index becb8285b6..f164b64762 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -423,7 +423,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { void tmq_list_destroy(tmq_list_t* list) { if (list == NULL) return; SArray* container = &list->container; - taosArrayDestroyP(container, taosMemoryFree); + taosArrayDestroyP(container, NULL); } int32_t tmq_list_get_size(const tmq_list_t* list) { @@ -509,7 +509,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->param = pParam; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemFree; pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; @@ -864,7 +864,7 @@ void tmqSendHbReq(void* param, void* tmrId) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->paramFreeFp = taosMemoryFree; + sendInfo->paramFreeFp = taosMemFree; sendInfo->param = taosMemoryMalloc(sizeof(int64_t)); *(int64_t*)sendInfo->param = refId; sendInfo->fp = tmqHbCb; @@ -1315,7 +1315,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } FAIL: - taosArrayDestroyP(req.topicNames, taosMemoryFree); + taosArrayDestroyP(req.topicNames, NULL); return code; } @@ -3184,7 +3184,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; - sendInfo->paramFreeFp = taosMemoryFree; + sendInfo->paramFreeFp = taosMemFree; sendInfo->fp = tmqGetWalInfoCb; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1d66a8c323..92d6870c05 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9447,11 +9447,11 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) { static void tDeleteMqDataRspCommon(void *rsp) { SMqDataRspCommon *pRsp = rsp; pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); - taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockData, NULL); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); pRsp->blockSchema = NULL; - taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockTbName, NULL); pRsp->blockTbName = NULL; tOffsetDestroy(&pRsp->reqOffset); tOffsetDestroy(&pRsp->rspOffset); @@ -9500,7 +9500,7 @@ void tDeleteSTaosxRsp(void *rsp) { STaosxRsp *pRsp = (STaosxRsp *)rsp; pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); - taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->createTableReq, NULL); pRsp->createTableReq = NULL; } @@ -10787,7 +10787,7 @@ int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pMetaBuff); - taosArrayDestroyP(pRsp->batchMetaReq, taosMemoryFree); + taosArrayDestroyP(pRsp->batchMetaReq, NULL); taosArrayDestroy(pRsp->batchMetaLen); pRsp->batchMetaReq = NULL; pRsp->batchMetaLen = NULL; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9a7a8155ec..79de5ff4dd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -696,7 +696,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { _over: mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); - taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); + taosArrayDestroyP(subscribe.topicNames, NULL); return code; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5164557184..3d2ba785d5 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -317,10 +317,10 @@ END: void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) { if (pConsumer == NULL) return; - taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); - taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); - taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); - taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); + taosArrayDestroyP(pConsumer->currentTopics, NULL); + taosArrayDestroyP(pConsumer->rebNewTopics, NULL); + taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL); + taosArrayDestroyP(pConsumer->assignedTopics, NULL); } void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 73542bbb1e..ec76ce7200 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3952,7 +3952,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { taosHashCleanup(p->pDbMap); } if (p->pResTbNames) { - taosArrayDestroyP(p->pResTbNames, taosMemoryFree); + taosArrayDestroyP(p->pResTbNames, NULL); } if (p->pTsmaMap) { void *pIter = taosHashIterate(p->pTsmaMap, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 2345cc599b..f0807ed6e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -925,7 +925,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { arg->tsdb = fs->tsdb; arg->fid = fset->fid; - code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL); + code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemFree, arg, NULL); TSDB_CHECK_CODE(code, lino, _exit); fset->mergeScheduled = true; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2264e2779b..090b3c4b9b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1175,7 +1175,7 @@ _exit: taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncoderClear(&encoder); - taosArrayDestroyP(tbNames, taosMemoryFree); + taosArrayDestroyP(tbNames, NULL); return rcode; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index bbd9b39f6c..db12b4e52a 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -724,7 +724,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_QNODE_LIST; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -778,7 +778,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_DNODE_LIST; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse); @@ -829,7 +829,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU int32_t msgLen = 0; int32_t reqType = TDMT_MND_USE_DB; SCtgTask* pTask = tReq ? tReq->pTask : NULL; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); @@ -881,7 +881,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_DB_CFG; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); @@ -936,7 +936,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_INDEX; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get index from mnode, indexName:%s", indexName); @@ -991,7 +991,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_TABLE_INDEX; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); @@ -1048,7 +1048,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_RETRIEVE_FUNC; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get udf info from mnode, funcName:%s", funcName); @@ -1103,7 +1103,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_USER_AUTH; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get user auth from mnode, user:%s", user); @@ -1163,7 +1163,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const int32_t reqType = TDMT_MND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, tbName); - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName); @@ -1226,7 +1226,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa int32_t reqType = TDMT_VND_TABLE_META; char tbFName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFName, "%s.%s", dbFName, pTableName->tname); - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId, @@ -1295,7 +1295,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t reqType = TDMT_VND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1360,7 +1360,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S int32_t reqType = TDMT_MND_TABLE_CFG; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFName); - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname}; @@ -1412,7 +1412,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_SERVER_VERSION; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; qDebug("try to get svr ver from mnode"); @@ -1463,7 +1463,7 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* int32_t msgLen = 0; int32_t reqType = TDMT_MND_VIEW_META; SCtgTask* pTask = tReq ? tReq->pTask : NULL; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -1517,7 +1517,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa char* msg = NULL; int32_t msgLen = 0; SCtgTask* pTask = tReq ? tReq->pTask : NULL; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); @@ -1575,7 +1575,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTbName, tbFName); SCtgTask* pTask = tReq ? tReq->pTask : NULL; - void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont; SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId, diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 01bc762a1d..805a20d85e 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -39,6 +39,7 @@ typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, i typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); +typedef int32_t (*FGetSinkFlags)(struct SDataSinkHandle* pHandle, uint64_t* flags); typedef struct SDataSinkHandle { FPutDataBlock fPut; @@ -48,6 +49,7 @@ typedef struct SDataSinkHandle { FGetDataBlock fGetData; FDestroyDataSinker fDestroy; FGetCacheSize fGetCacheSize; + FGetSinkFlags fGetFlags; } SDataSinkHandle; int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 15288c4406..6b9d7a1179 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -50,6 +50,7 @@ typedef struct SDataDeleterHandle { bool queryEnd; uint64_t useconds; uint64_t cachedSize; + uint64_t flags; TdThreadMutex mutex; } SDataDeleterHandle; @@ -239,6 +240,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { return TSDB_CODE_SUCCESS; } + +static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { + SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle; + + *pFlags = atomic_load_64(&pDispatcher->flags); + return TSDB_CODE_SUCCESS; +} + + int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { int32_t code = TSDB_CODE_SUCCESS; @@ -257,6 +267,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->sink.fGetData = getDataBlock; deleter->sink.fDestroy = destroyDataSinker; deleter->sink.fGetCacheSize = getCacheSize; + deleter->sink.fGetFlags = getSinkFlags; deleter->pManager = pManager; deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; @@ -276,6 +287,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } + deleter->flags = DS_FLAG_USE_MEMPOOL; *pHandle = deleter; return code; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 297c87ab40..324daea428 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -49,6 +49,7 @@ typedef struct SDataDispatchHandle { bool queryEnd; uint64_t useconds; uint64_t cachedSize; + uint64_t flags; void* pCompressBuf; int32_t bufSize; TdThreadMutex mutex; @@ -290,6 +291,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { return TSDB_CODE_SUCCESS; } + +static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + + *pFlags = atomic_load_64(&pDispatcher->flags); + return TSDB_CODE_SUCCESS; +} + + int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { @@ -304,7 +314,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fGetCacheSize = getCacheSize; - + dispatcher->sink.fGetFlags = getSinkFlags; dispatcher->pManager = pManager; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->status = DS_BUF_EMPTY; @@ -318,6 +328,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD goto _return; } + dispatcher->flags = DS_FLAG_USE_MEMPOOL; *pHandle = dispatcher; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5ba2f8bf42..4cd2aa93fb 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -45,6 +45,7 @@ typedef struct SDataInserterHandle { bool fullOrderColList; uint64_t useconds; uint64_t cachedSize; + uint64_t flags; TdThreadMutex mutex; tsem_t ready; bool explain; @@ -113,7 +114,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int pParam->pInserter = pInserter; pMsgSendInfo->param = pParam; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemFree; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = msgLen; pMsgSendInfo->msgType = TDMT_VND_SUBMIT; @@ -396,6 +397,13 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { return TSDB_CODE_SUCCESS; } +static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { + SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle; + + *pFlags = atomic_load_64(&pDispatcher->flags); + return TSDB_CODE_SUCCESS; +} + int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); @@ -412,6 +420,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->sink.fGetData = NULL; inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fGetCacheSize = getCacheSize; + inserter->sink.fGetFlags = getSinkFlags; inserter->pManager = pManager; inserter->pNode = pInserterNode; inserter->pParam = pParam; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index e711ffdf5c..471d0f3fad 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -100,3 +100,9 @@ void dsDestroyDataSinker(DataSinkHandle handle) { pHandleImpl->fDestroy(pHandleImpl); taosMemoryFree(pHandleImpl); } + +int32_t dsGetSinkFlags(DataSinkHandle handle, uint64_t* pFlags) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + return pHandleImpl->fGetFlags(pHandleImpl, pFlags); +} + diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c527224438..ddb083d6e4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -486,7 +486,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pDataInfo->startTime = taosGetTimestampUs(); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); + SFetchRspHandleWrapper* pWrapper = taosMemCalloc(1, sizeof(SFetchRspHandleWrapper)); pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; @@ -496,7 +496,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); loadRemoteDataCallback(pWrapper, &pBuf, code); - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); } else { SResFetchReq req = {0}; req.header.vgId = pSource->addr.nodeId; @@ -511,7 +511,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pDataInfo->pSrcUidList = NULL; if (TSDB_CODE_SUCCESS != code) { pTaskInfo->code = code; - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); return pTaskInfo->code; } } @@ -519,7 +519,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); if (msgSize < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } @@ -527,14 +527,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas void* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); taosMemoryFree(msg); freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; @@ -547,17 +547,17 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pSource->execId, pExchangeInfo, sourceIndex, totalSources); // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { taosMemoryFreeClear(msg); - taosMemoryFree(pWrapper); + taosMemFree(pWrapper); qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return pTaskInfo->code; } pMsgSendInfo->param = pWrapper; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemFree; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgType = pSource->fetchMsgType; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8b151edb27..47af61cda1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -671,7 +671,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime if (pOperatorInfo->limit == 0) return true; if (pOperatorInfo->pBQ == NULL) { - pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo); + pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, NULL, pOperatorInfo); } bool shouldFilter = false; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f118c15b7a..14203cc9ee 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -845,7 +845,7 @@ void nodesDestroyNode(SNode* pNode) { taosMemoryFreeClear(pReal->pMeta); taosMemoryFreeClear(pReal->pVgroupList); taosArrayDestroyEx(pReal->pSmaIndexes, destroySmaIndex); - taosArrayDestroyP(pReal->tsmaTargetTbVgInfo, taosMemoryFree); + taosArrayDestroyP(pReal->tsmaTargetTbVgInfo, NULL); taosArrayDestroy(pReal->tsmaTargetTbInfo); break; } @@ -1365,7 +1365,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pLogicNode->pTags); nodesDestroyNode(pLogicNode->pSubtable); taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam); - taosArrayDestroyP(pLogicNode->pTsmaTargetTbVgInfo, taosMemoryFree); + taosArrayDestroyP(pLogicNode->pTsmaTargetTbVgInfo, NULL); taosArrayDestroy(pLogicNode->pTsmaTargetTbInfo); break; } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 02f262cfb1..a0994d2e6e 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -45,7 +45,7 @@ int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placehol int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { SAstCreateContext cxt; initAstCreateContext(pParseCxt, &cxt); - void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc); + void* pParser = ParseAlloc((FMalloc)taosMemMalloc); int32_t i = 0; while (1) { SToken t0 = {0}; @@ -86,7 +86,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { } abort_parse: - ParseFree(pParser, (FFree)taosMemoryFree); + ParseFree(pParser, (FFree)taosMemFree); if (TSDB_CODE_SUCCESS == cxt.errCode) { int32_t code = buildQueryAfterParse(pQuery, cxt.pRootNode, cxt.placeholderNo, &cxt.pPlaceholderValues); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b01377010a..8dd91825ee 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3766,7 +3766,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo if (TSDB_CODE_SUCCESS == code && pRealTable->pTsmas && (pRealTable->pMeta->tableType == TSDB_CHILD_TABLE || pRealTable->pMeta->tableType == TSDB_NORMAL_TABLE)) { if (pRealTable->tsmaTargetTbVgInfo) { - taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, taosMemoryFree); + taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, NULL); pRealTable->tsmaTargetTbVgInfo = NULL; } char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1]; @@ -5835,7 +5835,7 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt* taosMemoryFree(vgsInfo); } } - taosArrayDestroyP(pTbNames, taosMemoryFree); + taosArrayDestroyP(pTbNames, NULL); if (code) break; } } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index d4c8fc103e..d4832ade40 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -141,6 +141,7 @@ typedef struct SQWTaskCtx { bool queryExecDone; bool queryInQueue; bool explainRsped; + bool sinkWithMemPool; int32_t rspCode; int64_t affectedRows; // for insert ...select stmt @@ -227,6 +228,17 @@ typedef struct SQWorkerMgmt { #define QW_IDS() sId, qId, tId, rId, eId #define QW_FPARAMS() mgmt, QW_IDS() +extern void* gQueryPoolHandle; + +#define QW_SINK_ENABLE_MEMPOOL(_ctx) \ + do { \ + if ((_ctx)->sinkWithMemPool) { \ + taosEnableMemoryPoolUsage(gQueryPoolHandle, (_ctx)->memPoolSession); \ + } \ + } while (0) + +#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemoryPoolUsage() + #define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) #define QW_STAT_GET(_item) atomic_load_64(&(_item)) @@ -418,6 +430,7 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); void qwDbgSimulateSleep(void); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +void qwInitQueryPool(void); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 40cf29057d..316dfcfc8a 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -15,7 +15,7 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { return TSDB_CODE_SUCCESS; } -int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int64_t* chunkSize) { +int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) { *chunkSize = 2 * 1048576; return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index d1988c3904..9a1b94a283 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -308,9 +308,9 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { qwFreeTaskHandle(&ctx->taskHandle); if (ctx->sinkHandle) { - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsDestroyDataSinker(ctx->sinkHandle); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); ctx->sinkHandle = NULL; qDebug("sink handle destroyed"); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 31b9ba303e..e609df4287 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,7 +18,7 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -static TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; +TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; int32_t qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, tId, sId; @@ -103,9 +103,9 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } if (!ctx->needFetch) { - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); } } @@ -174,9 +174,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { SSDataBlock *pRes = taosArrayGetP(pResList, j); SInputData inputData = {.pData = pRes}; - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); + if (code) { QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -204,9 +205,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ctx->queryExecDone = true; } - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsEndPut(sinkHandle, useconds); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (queryStop) { *queryStop = true; @@ -312,9 +313,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *pRawDataLen = 0; while (true) { - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (len < 0) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); @@ -323,9 +324,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (len == 0) { if (queryEnd) { - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); code = dsGetDataBlock(ctx->sinkHandle, &output); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); @@ -369,9 +370,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ((int32_t *)output.pData)[1] = rawLen; output.pData += sizeof(int32_t) * 2; - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); code = dsGetDataBlock(ctx->sinkHandle, &output); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); @@ -422,9 +423,9 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes int32_t code = 0; SOutputData output = {0}; - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (len <= 0 || len != sizeof(SDeleterRes)) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); @@ -436,9 +437,9 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); code = dsGetDataBlock(ctx->sinkHandle, &output); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); @@ -514,9 +515,9 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg ctx->queryEnd = false; #endif - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + QW_SINK_ENABLE_MEMPOOL(ctx); dsReset(ctx->sinkHandle); - taosDisableMemoryPoolUsage(); + QW_SINK_DISABLE_MEMPOOL(); qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); @@ -788,8 +789,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } + uint64_t flags = 0; + dsGetSinkFlags(sinkHandle, &flags); + ctx->level = plan->level; ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); + ctx->sinkWithMemPool = flags & DS_FLAG_USE_MEMPOOL; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); @@ -1273,6 +1278,11 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { ctx.taskHandle = pTaskInfo; ctx.sinkHandle = sinkHandle; + uint64_t flags = 0; + dsGetSinkFlags(sinkHandle, &flags); + + ctx.sinkWithMemPool = flags & DS_FLAG_USE_MEMPOOL; + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL)); QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d0dc04d6b4..791dd57fc9 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -617,7 +617,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - msgSendInfo->paramFreeFp = taosMemoryFree; + msgSendInfo->paramFreeFp = taosMemFree; SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param)); SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp)); @@ -788,7 +788,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { param->pTrans = pJob->conn.pTrans; pMsgSendInfo->param = param; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemFree; pMsgSendInfo->fp = fp; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo}; @@ -908,7 +908,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { pDst->param = NULL; SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); - pDst->paramFreeFp = taosMemoryFree; + pDst->paramFreeFp = taosMemFree; *dst = pDst; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06773c79e3..6e3c76d34f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -369,7 +369,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId if (code == 0) { code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId); } - taosArrayDestroyP(list, taosMemoryFree); + taosArrayDestroyP(list, NULL); if (code == 0) { taosMkDir(defaultPath); @@ -4037,8 +4037,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { memset(p->buf, 0, p->len); sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); + taosArrayClearP(p->pAdd, NULL); + taosArrayClearP(p->pDel, NULL); taosHashClear(p->pSstTbl[1 - p->idx]); TdDirPtr pDir = taosOpenDir(p->buf); @@ -4088,8 +4088,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel); if (code != 0) { // dead code - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); + taosArrayClearP(p->pAdd, NULL); + taosArrayClearP(p->pDel, NULL); taosHashClear(p->pSstTbl[1 - p->idx]); p->update = 0; return code; @@ -4140,9 +4140,9 @@ void dbChkpDestroy(SDbChkp* pChkp) { taosMemoryFree(pChkp->buf); taosMemoryFree(pChkp->path); - taosArrayDestroyP(pChkp->pSST, taosMemoryFree); - taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); - taosArrayDestroyP(pChkp->pDel, taosMemoryFree); + taosArrayDestroyP(pChkp->pSST, NULL); + taosArrayDestroyP(pChkp->pAdd, NULL); + taosArrayDestroyP(pChkp->pDel, NULL); taosHashCleanup(pChkp->pSstTbl[0]); taosHashCleanup(pChkp->pSstTbl[1]); @@ -4237,8 +4237,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { taosCloseFile(&pFile); // clear delta data buf - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); + taosArrayClearP(p->pAdd, NULL); + taosArrayClearP(p->pDel, NULL); code = 0; _ERROR: diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 78bd5c4511..a812735e3e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -616,7 +616,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId); } - taosArrayDestroyP(toDelFiles, taosMemoryFree); + taosArrayDestroyP(toDelFiles, NULL); double el = (taosGetTimestampMs() - now) / 1000.0; if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 83e73e8c88..78edcf1312 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -71,7 +71,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); if (pReq->data == NULL || pReq->dataLen == NULL) { - taosArrayDestroyP(pReq->data, taosMemoryFree); + taosArrayDestroyP(pReq->data, NULL); taosArrayDestroy(pReq->dataLen); return TSDB_CODE_OUT_OF_MEMORY; } @@ -226,7 +226,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { for (int32_t i = 0; i < numOfVgroups; i++) { - taosArrayDestroyP(pReq[i].data, taosMemoryFree); + taosArrayDestroyP(pReq[i].data, NULL); taosArrayDestroy(pReq[i].dataLen); } diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index ac67742ef5..576fb19f66 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -266,7 +266,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { } void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { - taosArrayDestroyP(pReq->data, taosMemoryFree); + taosArrayDestroyP(pReq->data, NULL); taosArrayDestroy(pReq->dataLen); } diff --git a/source/os/CMakeLists.txt b/source/os/CMakeLists.txt index 6438ce7ed0..b8362bbd3c 100644 --- a/source/os/CMakeLists.txt +++ b/source/os/CMakeLists.txt @@ -8,6 +8,7 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/contrib/pthread" PUBLIC "${TD_SOURCE_DIR}/contrib/iconv" PUBLIC "${TD_SOURCE_DIR}/contrib/msvcregex" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) # iconv if(TD_WINDOWS) diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 2852abbf50..022e802e1a 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -313,7 +313,7 @@ void *taosMemRealloc(void *ptr, int64_t size) { #endif } -char *taosStrdup(const char *ptr) { +char *taosStrdupi(const char *ptr) { #ifdef USE_TD_MEMORY if (ptr == NULL) return NULL; diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index acd63db984..09616d3e5b 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -13,14 +13,15 @@ * along with this program. If not, see . */ -#ifndef _TD_UTIL_MEMPOOL_INT_H_ -#define _TD_UTIL_MEMPOOL_INT_H_ +#ifndef _TD_MEMPOOL_INT_H_ +#define _TD_MEMPOOL_INT_H_ #ifdef __cplusplus extern "C" { #endif #include "os.h" +#include "tlockfree.h" #define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 #define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 @@ -263,4 +264,4 @@ enum { } #endif -#endif /* _TD_UTIL_MEMPOOL_INT_H_ */ +#endif /* _TD_MEMPOOL_INT_H_ */ diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index d9686d77f8..13bb400c3b 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -380,8 +380,14 @@ void taosArrayClearP(SArray* pArray, void (*fp)(void*)) { // fp(TARRAY_GET_ELEM(pArray, i)); // } if (pArray) { - for (int32_t i = 0; i < pArray->size; i++) { - fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + if (NULL == fp) { + for (int32_t i = 0; i < pArray->size; i++) { + taosMemoryFree(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + } else { + for (int32_t i = 0; i < pArray->size; i++) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } } } taosArrayClear(pArray); @@ -398,8 +404,14 @@ void* taosArrayDestroy(SArray* pArray) { void taosArrayDestroyP(SArray* pArray, FDelete fp) { if (pArray) { - for (int32_t i = 0; i < pArray->size; i++) { - fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + if (NULL == fp) { + for (int32_t i = 0; i < pArray->size; i++) { + taosMemoryFree(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + } else { + for (int32_t i = 0; i < pArray->size; i++) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } } taosArrayDestroy(pArray); } @@ -544,4 +556,4 @@ void taosArraySwap(SArray* a, SArray* b) { void* data = a->pData; a->pData = b->pData; b->pData = data; -} \ No newline at end of file +} diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 315ddf9367..a2239ff2e9 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -212,7 +212,7 @@ void destroyPriorityQueue(PriorityQueue* pq) { if (pq->deleteFn) taosArrayDestroyP(pq->container, pq->deleteFn); else - taosArrayDestroy(pq->container); + taosArrayDestroyP(pq->container, NULL); taosMemoryFree(pq); } @@ -299,7 +299,11 @@ PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) void taosPQPop(PriorityQueue* pq) { PriorityQueueNode* top = taosPQTop(pq); - if (pq->deleteFn) pq->deleteFn(top->data); + if (pq->deleteFn) { + pq->deleteFn(top->data); + } else { + taosMemoryFree(top->data); + } pqRemove(pq, 0); } @@ -335,7 +339,11 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { void* p = top->data; top->data = n->data; n->data = p; - if (q->queue->deleteFn) q->queue->deleteFn(n->data); + if (q->queue->deleteFn) { + q->queue->deleteFn(n->data); + } else { + taosMemoryFree(n->data); + } } return pqHeapify(q->queue, 0, taosBQSize(q)); } else { diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index d2b1b83d14..31b6810a83 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "tmempool.h" +#include "osMemPool.h" #include "tmempoolInt.h" #include "tlog.h" #include "tutil.h" @@ -22,8 +22,8 @@ static SArray* gMPoolList = NULL; static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; static TdThreadMutex gMPoolMutex; -static threadlocal void* threadPoolHandle = NULL; -static threadlocal void* threadPoolSession = NULL; +threadlocal void* threadPoolHandle = NULL; +threadlocal void* threadPoolSession = NULL; int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { @@ -120,7 +120,7 @@ _return: int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { SMPChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pChunk)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pChunk)); pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); if (NULL == pChunk->pMemStart) { @@ -137,7 +137,7 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) { SMPNSChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, &pChunk)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, (void**)&pChunk)); pChunk->pMemStart = taosMemMalloc(chunkSize); if (NULL == pChunk->pMemStart) { @@ -156,10 +156,9 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { - SMPCacheGroup* pGrp = NULL; SMPChunk* pChunk = NULL; for (int32_t i = 0; i < num; ++i) { - MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk)); + MP_ERR_RET(memPoolNewChunk(pPool, &pChunk)); if (NULL == pPool->readyChunkTail) { pPool->readyChunkHead = pChunk; @@ -221,7 +220,7 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->NSChunkCache, NULL)); MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->sessionCache, NULL)); - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pPool->readyChunkHead)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pPool->readyChunkHead)); pPool->readyChunkTail = pPool->readyChunkHead; MP_ERR_RET(memPoolEnsureChunks(pPool)); @@ -252,7 +251,7 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { return TSDB_CODE_SUCCESS; } - MP_RET(memPoolNewChunk(pPool, NULL, ppChunk)); + MP_RET(memPoolNewChunk(pPool, ppChunk)); } int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) { @@ -275,6 +274,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_ } void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { + int32_t code = TSDB_CODE_SUCCESS; SMPChunk* pChunk = NULL, *preSrcChunk = NULL; void* pRes = NULL; int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); @@ -318,6 +318,7 @@ _return: } void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { + int32_t code = TSDB_CODE_SUCCESS; SMPNSChunk* pChunk = NULL; void* pRes = NULL; int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); @@ -387,7 +388,7 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fil } int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) { - SMPMemHeader* pHeader = (char)ptr - sizeof(SMPMemHeader); + SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; return pHeader->size; } @@ -410,7 +411,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - SMemPool* pPool = taosMemoryCalloc(1, sizeof(SMemPool)); + pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool)); if (NULL == pPool) { uError("calloc memory pool failed"); MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -447,7 +448,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; - MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, &pSession)); + MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead)); @@ -474,7 +475,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { - uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size); + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -493,7 +494,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, - __FUNC__, poolHandle, session, fileName, num, size); + __FUNCTION__, poolHandle, session, fileName, num, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -520,7 +521,7 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, - __FUNC__, poolHandle, session, fileName, size); + __FUNCTION__, poolHandle, session, fileName, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -560,7 +561,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", - __FUNC__, poolHandle, session, fileName, ptr); + __FUNCTION__, poolHandle, session, fileName, ptr); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -585,7 +586,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", - __FUNC__, poolHandle, session, fileName, ptr); + __FUNCTION__, poolHandle, session, fileName, ptr); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -598,11 +599,11 @@ _return: return; } -int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { +int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName) { - uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%p", - __FUNC__, poolHandle, session, fileName, size); + uError("%s invalid input param, handle:%p, session:%p, fileName:%p", + __FUNCTION__, poolHandle, session, fileName); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -612,7 +613,7 @@ int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - MP_RET(memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo)); + return memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo); _return: @@ -624,7 +625,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, - __FUNC__, poolHandle, session, fileName, alignment, size); + __FUNCTION__, poolHandle, session, fileName, alignment, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } @@ -645,4 +646,17 @@ void taosMemPoolModDestroy(void) { } +void taosAutoMemoryFree(void *ptr) { + if (NULL != threadPoolHandle) { + taosMemPoolFree(threadPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__); + } else { + taosMemFree(ptr); + } +} + +void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo) { + +} + + diff --git a/source/util/test/arrayTest.cpp b/source/util/test/arrayTest.cpp index 307719b984..95468eecaa 100644 --- a/source/util/test/arrayTest.cpp +++ b/source/util/test/arrayTest.cpp @@ -182,8 +182,8 @@ TEST(arrayTest, check_duplicate_needfree) { ASSERT_STREQ(v, value); } - taosArrayClearP(pa, taosMemoryFree); - taosArrayDestroyP(pa, taosMemoryFree); + taosArrayClearP(pa, NULL); + taosArrayDestroyP(pa, NULL); } // over all