enh: add query memory pool

This commit is contained in:
dapan1121 2024-07-01 17:22:43 +08:00
parent 2a4ba7eafd
commit b3eb938527
40 changed files with 246 additions and 136 deletions

View File

@ -29,6 +29,9 @@ extern "C" {
#define DS_BUF_FULL 2
#define DS_BUF_EMPTY 3
#define DS_FLAG_USE_MEMPOOL (1 << 0)
struct SSDataBlock;
typedef struct SDeleterRes {
@ -131,6 +134,9 @@ void dsScheduleProcess(void* ahandle, void* pItem);
*/
void dsDestroyDataSinker(DataSinkHandle handle);
int32_t dsGetSinkFlags(DataSinkHandle handle, uint64_t* pFlags);
#ifdef __cplusplus
}
#endif

View File

@ -109,6 +109,7 @@ extern "C" {
#include "osLz4.h"
#include "osMath.h"
#include "osMemory.h"
#include "osMemPool.h"
#include "osRand.h"
#include "osSemaphore.h"
#include "osSignal.h"

View File

@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MEMPOOL_H_
#define _TD_UTIL_MEMPOOL_H_
#ifndef _TD_OS_MEMPOOL_H_
#define _TD_OS_MEMPOOL_H_
#include "os.h"
@ -50,6 +50,14 @@ void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo);
void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void);
void taosAutoMemoryFree(void *ptr);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession);
void taosMemPoolDestroySession(void* session);
extern threadlocal void* threadPoolHandle;
extern threadlocal void* threadPoolSession;
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0)
#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL)
@ -76,4 +84,4 @@ void taosMemPoolModDestroy(void);
}
#endif
#endif /*_TD_UTIL_MEMPOOL_H_*/
#endif /*_TD_OS_MEMPOOL_H_*/

View File

@ -36,11 +36,6 @@ extern "C" {
#endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
// #define taosMemoryMalloc malloc
// #define taosMemoryCalloc calloc
// #define taosMemoryRealloc realloc
// #define taosMemoryFree free
int32_t taosMemoryDbgInit();
int32_t taosMemoryDbgInitRestore();
void *taosMemMalloc(int64_t size);
@ -61,6 +56,7 @@ void *taosMemMallocAlign(uint32_t alignment, int64_t size);
} \
} while (0)
#include "osMemPool.h"
#ifdef __cplusplus
}

View File

@ -1215,7 +1215,7 @@ static void *hbThreadFunc(void *param) {
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = taosMemoryMalloc(sizeof(int32_t));
*(int32_t *)pInfo->param = i;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->paramFreeFp = taosMemFree;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;

View File

@ -124,7 +124,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
pInfo->msgType = TDMT_MND_STATIS;
// pInfo->param = taosMemoryMalloc(sizeof(int32_t));
// *(int32_t*)pInfo->param = i;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->paramFreeFp = taosMemFree;
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;

View File

@ -423,7 +423,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
void tmq_list_destroy(tmq_list_t* list) {
if (list == NULL) return;
SArray* container = &list->container;
taosArrayDestroyP(container, taosMemoryFree);
taosArrayDestroyP(container, NULL);
}
int32_t tmq_list_get_size(const tmq_list_t* list) {
@ -509,7 +509,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
@ -864,7 +864,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->paramFreeFp = taosMemFree;
sendInfo->param = taosMemoryMalloc(sizeof(int64_t));
*(int64_t*)sendInfo->param = refId;
sendInfo->fp = tmqHbCb;
@ -1315,7 +1315,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
FAIL:
taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosArrayDestroyP(req.topicNames, NULL);
return code;
}
@ -3184,7 +3184,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->paramFreeFp = taosMemFree;
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

View File

@ -9447,11 +9447,11 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) {
static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp;
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
taosArrayDestroyP(pRsp->blockData, NULL);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
taosArrayDestroyP(pRsp->blockTbName, NULL);
pRsp->blockTbName = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
@ -9500,7 +9500,7 @@ void tDeleteSTaosxRsp(void *rsp) {
STaosxRsp *pRsp = (STaosxRsp *)rsp;
pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen);
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
taosArrayDestroyP(pRsp->createTableReq, NULL);
pRsp->createTableReq = NULL;
}
@ -10787,7 +10787,7 @@ int32_t tSemiDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) {
void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) {
taosMemoryFreeClear(pRsp->pMetaBuff);
taosArrayDestroyP(pRsp->batchMetaReq, taosMemoryFree);
taosArrayDestroyP(pRsp->batchMetaReq, NULL);
taosArrayDestroy(pRsp->batchMetaLen);
pRsp->batchMetaReq = NULL;
pRsp->batchMetaLen = NULL;

View File

@ -696,7 +696,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
_over:
mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
taosArrayDestroyP(subscribe.topicNames, NULL);
return code;
}

View File

@ -317,10 +317,10 @@ END:
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
if (pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->currentTopics, NULL);
taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
taosArrayDestroyP(pConsumer->assignedTopics, NULL);
}
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {

View File

@ -3952,7 +3952,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
taosHashCleanup(p->pDbMap);
}
if (p->pResTbNames) {
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
taosArrayDestroyP(p->pResTbNames, NULL);
}
if (p->pTsmaMap) {
void *pIter = taosHashIterate(p->pTsmaMap, NULL);

View File

@ -925,7 +925,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb;
arg->fid = fset->fid;
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL);
code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemFree, arg, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true;
}

View File

@ -1175,7 +1175,7 @@ _exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
taosArrayDestroyP(tbNames, taosMemoryFree);
taosArrayDestroyP(tbNames, NULL);
return rcode;
}

View File

@ -724,7 +724,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@ -778,7 +778,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_DNODE_LIST;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
@ -829,7 +829,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
@ -881,7 +881,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
@ -936,7 +936,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
@ -991,7 +991,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
@ -1048,7 +1048,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
@ -1103,7 +1103,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user);
@ -1163,7 +1163,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const
int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
@ -1226,7 +1226,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
@ -1295,7 +1295,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_VND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
@ -1360,7 +1360,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t reqType = TDMT_MND_TABLE_CFG;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFName);
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
@ -1412,7 +1412,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_SERVER_VERSION;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
qDebug("try to get svr ver from mnode");
@ -1463,7 +1463,7 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_VIEW_META;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
@ -1517,7 +1517,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
char* msg = NULL;
int32_t msgLen = 0;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(name, tbFName);
@ -1575,7 +1575,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTbName, tbFName);
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemMalloc : (MallocType)rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId,

View File

@ -39,6 +39,7 @@ typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, i
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
typedef int32_t (*FGetSinkFlags)(struct SDataSinkHandle* pHandle, uint64_t* flags);
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
@ -48,6 +49,7 @@ typedef struct SDataSinkHandle {
FGetDataBlock fGetData;
FDestroyDataSinker fDestroy;
FGetCacheSize fGetCacheSize;
FGetSinkFlags fGetFlags;
} SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);

View File

@ -50,6 +50,7 @@ typedef struct SDataDeleterHandle {
bool queryEnd;
uint64_t useconds;
uint64_t cachedSize;
uint64_t flags;
TdThreadMutex mutex;
} SDataDeleterHandle;
@ -239,6 +240,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS;
}
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
*pFlags = atomic_load_64(&pDispatcher->flags);
return TSDB_CODE_SUCCESS;
}
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam) {
int32_t code = TSDB_CODE_SUCCESS;
@ -257,6 +267,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
deleter->sink.fGetData = getDataBlock;
deleter->sink.fDestroy = destroyDataSinker;
deleter->sink.fGetCacheSize = getCacheSize;
deleter->sink.fGetFlags = getSinkFlags;
deleter->pManager = pManager;
deleter->pDeleter = pDeleterNode;
deleter->pSchema = pDataSink->pInputDataBlockDesc;
@ -276,6 +287,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
deleter->flags = DS_FLAG_USE_MEMPOOL;
*pHandle = deleter;
return code;

View File

@ -49,6 +49,7 @@ typedef struct SDataDispatchHandle {
bool queryEnd;
uint64_t useconds;
uint64_t cachedSize;
uint64_t flags;
void* pCompressBuf;
int32_t bufSize;
TdThreadMutex mutex;
@ -290,6 +291,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS;
}
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
*pFlags = atomic_load_64(&pDispatcher->flags);
return TSDB_CODE_SUCCESS;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
@ -304,7 +314,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->sink.fGetCacheSize = getCacheSize;
dispatcher->sink.fGetFlags = getSinkFlags;
dispatcher->pManager = pManager;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->status = DS_BUF_EMPTY;
@ -318,6 +328,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
goto _return;
}
dispatcher->flags = DS_FLAG_USE_MEMPOOL;
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;

View File

@ -45,6 +45,7 @@ typedef struct SDataInserterHandle {
bool fullOrderColList;
uint64_t useconds;
uint64_t cachedSize;
uint64_t flags;
TdThreadMutex mutex;
tsem_t ready;
bool explain;
@ -113,7 +114,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int
pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = msgLen;
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
@ -396,6 +397,13 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS;
}
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
*pFlags = atomic_load_64(&pDispatcher->flags);
return TSDB_CODE_SUCCESS;
}
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam) {
SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
@ -412,6 +420,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->sink.fGetData = NULL;
inserter->sink.fDestroy = destroyDataSinker;
inserter->sink.fGetCacheSize = getCacheSize;
inserter->sink.fGetFlags = getSinkFlags;
inserter->pManager = pManager;
inserter->pNode = pInserterNode;
inserter->pParam = pParam;

View File

@ -100,3 +100,9 @@ void dsDestroyDataSinker(DataSinkHandle handle) {
pHandleImpl->fDestroy(pHandleImpl);
taosMemoryFree(pHandleImpl);
}
int32_t dsGetSinkFlags(DataSinkHandle handle, uint64_t* pFlags) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetFlags(pHandleImpl, pFlags);
}

View File

@ -486,7 +486,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pDataInfo->startTime = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
SFetchRspHandleWrapper* pWrapper = taosMemCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
@ -496,7 +496,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
(*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
} else {
SResFetchReq req = {0};
req.header.vgId = pSource->addr.nodeId;
@ -511,7 +511,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pDataInfo->pSrcUidList = NULL;
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
return pTaskInfo->code;
}
}
@ -519,7 +519,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
@ -527,14 +527,14 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
taosMemoryFree(msg);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
@ -547,17 +547,17 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pSource->execId, pExchangeInfo, sourceIndex, totalSources);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(msg);
taosMemoryFree(pWrapper);
taosMemFree(pWrapper);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return pTaskInfo->code;
}
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = pSource->fetchMsgType;

View File

@ -671,7 +671,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime
if (pOperatorInfo->limit == 0) return true;
if (pOperatorInfo->pBQ == NULL) {
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, NULL, pOperatorInfo);
}
bool shouldFilter = false;

View File

@ -845,7 +845,7 @@ void nodesDestroyNode(SNode* pNode) {
taosMemoryFreeClear(pReal->pMeta);
taosMemoryFreeClear(pReal->pVgroupList);
taosArrayDestroyEx(pReal->pSmaIndexes, destroySmaIndex);
taosArrayDestroyP(pReal->tsmaTargetTbVgInfo, taosMemoryFree);
taosArrayDestroyP(pReal->tsmaTargetTbVgInfo, NULL);
taosArrayDestroy(pReal->tsmaTargetTbInfo);
break;
}
@ -1365,7 +1365,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam);
taosArrayDestroyP(pLogicNode->pTsmaTargetTbVgInfo, taosMemoryFree);
taosArrayDestroyP(pLogicNode->pTsmaTargetTbVgInfo, NULL);
taosArrayDestroy(pLogicNode->pTsmaTargetTbInfo);
break;
}

View File

@ -45,7 +45,7 @@ int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placehol
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt;
initAstCreateContext(pParseCxt, &cxt);
void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc);
void* pParser = ParseAlloc((FMalloc)taosMemMalloc);
int32_t i = 0;
while (1) {
SToken t0 = {0};
@ -86,7 +86,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
}
abort_parse:
ParseFree(pParser, (FFree)taosMemoryFree);
ParseFree(pParser, (FFree)taosMemFree);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
int32_t code = buildQueryAfterParse(pQuery, cxt.pRootNode, cxt.placeholderNo, &cxt.pPlaceholderValues);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -3766,7 +3766,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
if (TSDB_CODE_SUCCESS == code && pRealTable->pTsmas &&
(pRealTable->pMeta->tableType == TSDB_CHILD_TABLE || pRealTable->pMeta->tableType == TSDB_NORMAL_TABLE)) {
if (pRealTable->tsmaTargetTbVgInfo) {
taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, taosMemoryFree);
taosArrayDestroyP(pRealTable->tsmaTargetTbVgInfo, NULL);
pRealTable->tsmaTargetTbVgInfo = NULL;
}
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
@ -5835,7 +5835,7 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
taosMemoryFree(vgsInfo);
}
}
taosArrayDestroyP(pTbNames, taosMemoryFree);
taosArrayDestroyP(pTbNames, NULL);
if (code) break;
}
}

View File

@ -141,6 +141,7 @@ typedef struct SQWTaskCtx {
bool queryExecDone;
bool queryInQueue;
bool explainRsped;
bool sinkWithMemPool;
int32_t rspCode;
int64_t affectedRows; // for insert ...select stmt
@ -227,6 +228,17 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS()
extern void* gQueryPoolHandle;
#define QW_SINK_ENABLE_MEMPOOL(_ctx) \
do { \
if ((_ctx)->sinkWithMemPool) { \
taosEnableMemoryPoolUsage(gQueryPoolHandle, (_ctx)->memPoolSession); \
} \
} while (0)
#define QW_SINK_DISABLE_MEMPOOL() taosDisableMemoryPoolUsage()
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
@ -418,6 +430,7 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);
void qwDbgSimulateSleep(void);
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwInitQueryPool(void);
#ifdef __cplusplus
}

View File

@ -15,7 +15,7 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
return TSDB_CODE_SUCCESS;
}
int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int64_t* chunkSize) {
int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) {
*chunkSize = 2 * 1048576;
return TSDB_CODE_SUCCESS;

View File

@ -308,9 +308,9 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
qwFreeTaskHandle(&ctx->taskHandle);
if (ctx->sinkHandle) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsDestroyDataSinker(ctx->sinkHandle);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
ctx->sinkHandle = NULL;
qDebug("sink handle destroyed");

View File

@ -18,7 +18,7 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0,
};
static TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
@ -103,9 +103,9 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
}
if (!ctx->needFetch) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
}
}
@ -174,9 +174,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
SSDataBlock *pRes = taosArrayGetP(pResList, j);
SInputData inputData = {.pData = pRes};
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
@ -204,9 +205,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
ctx->queryExecDone = true;
}
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsEndPut(sinkHandle, useconds);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (queryStop) {
*queryStop = true;
@ -312,9 +313,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*pRawDataLen = 0;
while (true) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
@ -323,9 +324,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (len == 0) {
if (queryEnd) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
@ -369,9 +370,9 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
((int32_t *)output.pData)[1] = rawLen;
output.pData += sizeof(int32_t) * 2;
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
@ -422,9 +423,9 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
int32_t code = 0;
SOutputData output = {0};
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
@ -436,9 +437,9 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
@ -514,9 +515,9 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
ctx->queryEnd = false;
#endif
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
QW_SINK_ENABLE_MEMPOOL(ctx);
dsReset(ctx->sinkHandle);
taosDisableMemoryPoolUsage();
QW_SINK_DISABLE_MEMPOOL();
qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg);
@ -788,8 +789,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
}
uint64_t flags = 0;
dsGetSinkFlags(sinkHandle, &flags);
ctx->level = plan->level;
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo);
ctx->sinkWithMemPool = flags & DS_FLAG_USE_MEMPOOL;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
@ -1273,6 +1278,11 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
ctx.taskHandle = pTaskInfo;
ctx.sinkHandle = sinkHandle;
uint64_t flags = 0;
dsGetSinkFlags(sinkHandle, &flags);
ctx.sinkWithMemPool = flags & DS_FLAG_USE_MEMPOOL;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, pRes));

View File

@ -617,7 +617,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msgSendInfo->paramFreeFp = taosMemoryFree;
msgSendInfo->paramFreeFp = taosMemFree;
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
@ -788,7 +788,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemFree;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
@ -908,7 +908,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
pDst->paramFreeFp = taosMemoryFree;
pDst->paramFreeFp = taosMemFree;
*dst = pDst;

View File

@ -369,7 +369,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
if (code == 0) {
code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId);
}
taosArrayDestroyP(list, taosMemoryFree);
taosArrayDestroyP(list, NULL);
if (code == 0) {
taosMkDir(defaultPath);
@ -4037,8 +4037,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosArrayClearP(p->pAdd, NULL);
taosArrayClearP(p->pDel, NULL);
taosHashClear(p->pSstTbl[1 - p->idx]);
TdDirPtr pDir = taosOpenDir(p->buf);
@ -4088,8 +4088,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
if (code != 0) {
// dead code
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosArrayClearP(p->pAdd, NULL);
taosArrayClearP(p->pDel, NULL);
taosHashClear(p->pSstTbl[1 - p->idx]);
p->update = 0;
return code;
@ -4140,9 +4140,9 @@ void dbChkpDestroy(SDbChkp* pChkp) {
taosMemoryFree(pChkp->buf);
taosMemoryFree(pChkp->path);
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
taosArrayDestroyP(pChkp->pSST, NULL);
taosArrayDestroyP(pChkp->pAdd, NULL);
taosArrayDestroyP(pChkp->pDel, NULL);
taosHashCleanup(pChkp->pSstTbl[0]);
taosHashCleanup(pChkp->pSstTbl[1]);
@ -4237,8 +4237,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
taosCloseFile(&pFile);
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosArrayClearP(p->pAdd, NULL);
taosArrayClearP(p->pDel, NULL);
code = 0;
_ERROR:

View File

@ -616,7 +616,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
}
taosArrayDestroyP(toDelFiles, taosMemoryFree);
taosArrayDestroyP(toDelFiles, NULL);
double el = (taosGetTimestampMs() - now) / 1000.0;
if (code == TSDB_CODE_SUCCESS) {

View File

@ -71,7 +71,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
if (pReq->data == NULL || pReq->dataLen == NULL) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroyP(pReq->data, NULL);
taosArrayDestroy(pReq->dataLen);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -226,7 +226,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
for (int32_t i = 0; i < numOfVgroups; i++) {
taosArrayDestroyP(pReq[i].data, taosMemoryFree);
taosArrayDestroyP(pReq[i].data, NULL);
taosArrayDestroy(pReq[i].dataLen);
}

View File

@ -266,7 +266,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
}
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroyP(pReq->data, NULL);
taosArrayDestroy(pReq->dataLen);
}

View File

@ -8,6 +8,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/contrib/pthread"
PUBLIC "${TD_SOURCE_DIR}/contrib/iconv"
PUBLIC "${TD_SOURCE_DIR}/contrib/msvcregex"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
# iconv
if(TD_WINDOWS)

View File

@ -313,7 +313,7 @@ void *taosMemRealloc(void *ptr, int64_t size) {
#endif
}
char *taosStrdup(const char *ptr) {
char *taosStrdupi(const char *ptr) {
#ifdef USE_TD_MEMORY
if (ptr == NULL) return NULL;

View File

@ -13,14 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MEMPOOL_INT_H_
#define _TD_UTIL_MEMPOOL_INT_H_
#ifndef _TD_MEMPOOL_INT_H_
#define _TD_MEMPOOL_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tlockfree.h"
#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500
@ -263,4 +264,4 @@ enum {
}
#endif
#endif /* _TD_UTIL_MEMPOOL_INT_H_ */
#endif /* _TD_MEMPOOL_INT_H_ */

View File

@ -380,10 +380,16 @@ void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
// fp(TARRAY_GET_ELEM(pArray, i));
// }
if (pArray) {
if (NULL == fp) {
for (int32_t i = 0; i < pArray->size; i++) {
taosMemoryFree(*(void**)TARRAY_GET_ELEM(pArray, i));
}
} else {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
}
taosArrayClear(pArray);
}
@ -398,9 +404,15 @@ void* taosArrayDestroy(SArray* pArray) {
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
if (pArray) {
if (NULL == fp) {
for (int32_t i = 0; i < pArray->size; i++) {
taosMemoryFree(*(void**)TARRAY_GET_ELEM(pArray, i));
}
} else {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
taosArrayDestroy(pArray);
}
}

View File

@ -212,7 +212,7 @@ void destroyPriorityQueue(PriorityQueue* pq) {
if (pq->deleteFn)
taosArrayDestroyP(pq->container, pq->deleteFn);
else
taosArrayDestroy(pq->container);
taosArrayDestroyP(pq->container, NULL);
taosMemoryFree(pq);
}
@ -299,7 +299,11 @@ PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node)
void taosPQPop(PriorityQueue* pq) {
PriorityQueueNode* top = taosPQTop(pq);
if (pq->deleteFn) pq->deleteFn(top->data);
if (pq->deleteFn) {
pq->deleteFn(top->data);
} else {
taosMemoryFree(top->data);
}
pqRemove(pq, 0);
}
@ -335,7 +339,11 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
void* p = top->data;
top->data = n->data;
n->data = p;
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
if (q->queue->deleteFn) {
q->queue->deleteFn(n->data);
} else {
taosMemoryFree(n->data);
}
}
return pqHeapify(q->queue, 0, taosBQSize(q));
} else {

View File

@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "tmempool.h"
#include "osMemPool.h"
#include "tmempoolInt.h"
#include "tlog.h"
#include "tutil.h"
@ -22,8 +22,8 @@
static SArray* gMPoolList = NULL;
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
static TdThreadMutex gMPoolMutex;
static threadlocal void* threadPoolHandle = NULL;
static threadlocal void* threadPoolSession = NULL;
threadlocal void* threadPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL;
int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
@ -120,7 +120,7 @@ _return:
int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pChunk));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pChunk));
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
if (NULL == pChunk->pMemStart) {
@ -137,7 +137,7 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) {
SMPNSChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, &pChunk));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, (void**)&pChunk));
pChunk->pMemStart = taosMemMalloc(chunkSize);
if (NULL == pChunk->pMemStart) {
@ -156,10 +156,9 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi
int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) {
SMPCacheGroup* pGrp = NULL;
SMPChunk* pChunk = NULL;
for (int32_t i = 0; i < num; ++i) {
MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk));
MP_ERR_RET(memPoolNewChunk(pPool, &pChunk));
if (NULL == pPool->readyChunkTail) {
pPool->readyChunkHead = pChunk;
@ -221,7 +220,7 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->NSChunkCache, NULL));
MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->sessionCache, NULL));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pPool->readyChunkHead));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pPool->readyChunkHead));
pPool->readyChunkTail = pPool->readyChunkHead;
MP_ERR_RET(memPoolEnsureChunks(pPool));
@ -252,7 +251,7 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) {
return TSDB_CODE_SUCCESS;
}
MP_RET(memPoolNewChunk(pPool, NULL, ppChunk));
MP_RET(memPoolNewChunk(pPool, ppChunk));
}
int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) {
@ -275,6 +274,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_
}
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
int32_t code = TSDB_CODE_SUCCESS;
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
@ -318,6 +318,7 @@ _return:
}
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
int32_t code = TSDB_CODE_SUCCESS;
SMPNSChunk* pChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
@ -387,7 +388,7 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fil
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) {
SMPMemHeader* pHeader = (char)ptr - sizeof(SMPMemHeader);
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
}
@ -410,7 +411,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
SMemPool* pPool = taosMemoryCalloc(1, sizeof(SMemPool));
pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool));
if (NULL == pPool) {
uError("calloc memory pool failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@ -447,7 +448,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, &pSession));
MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead));
@ -474,7 +475,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size);
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -493,7 +494,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64,
__FUNC__, poolHandle, session, fileName, num, size);
__FUNCTION__, poolHandle, session, fileName, num, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -520,7 +521,7 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64,
__FUNC__, poolHandle, session, fileName, size);
__FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -560,7 +561,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNC__, poolHandle, session, fileName, ptr);
__FUNCTION__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -585,7 +586,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNC__, poolHandle, session, fileName, ptr);
__FUNCTION__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -598,11 +599,11 @@ _return:
return;
}
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%p",
__FUNC__, poolHandle, session, fileName, size);
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -612,7 +613,7 @@ int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
MP_RET(memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo));
return memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo);
_return:
@ -624,7 +625,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64,
__FUNC__, poolHandle, session, fileName, alignment, size);
__FUNCTION__, poolHandle, session, fileName, alignment, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
@ -645,4 +646,17 @@ void taosMemPoolModDestroy(void) {
}
void taosAutoMemoryFree(void *ptr) {
if (NULL != threadPoolHandle) {
taosMemPoolFree(threadPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__);
} else {
taosMemFree(ptr);
}
}
void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo) {
}

View File

@ -182,8 +182,8 @@ TEST(arrayTest, check_duplicate_needfree) {
ASSERT_STREQ(v, value);
}
taosArrayClearP(pa, taosMemoryFree);
taosArrayDestroyP(pa, taosMemoryFree);
taosArrayClearP(pa, NULL);
taosArrayDestroyP(pa, NULL);
}
// over all