enh: add query memory management

This commit is contained in:
dapan1121 2024-07-04 19:28:30 +08:00
parent 1bcba8f604
commit d34112983f
9 changed files with 294 additions and 59 deletions

View File

@ -42,16 +42,19 @@ typedef enum MemPoolUsageLevel {
typedef void (*decConcSessionNum)(void);
typedef void (*incConcSessionNum)(void);
typedef void (*setConcSessionNum)(int32_t);
typedef void (*retireCollection)(uint64_t, int64_t);
typedef struct SMemPoolCallBack {
decConcSessionNum decSessFp;
incConcSessionNum incSessFp;
setConcSessionNum setSessFp;
retireCollection retireFp;
} SMemPoolCallBack;
typedef struct SMemPoolCfg {
int64_t maxSize;
int64_t sessionExpectSize;
int64_t *collectionQuota;
int32_t chunkSize;
int32_t threadNum;
int8_t usageLevel[E_MEM_USAGE_MAX_VALUE];

View File

@ -546,9 +546,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
@ -560,6 +557,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
return -1;
}
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));

View File

@ -60,6 +60,7 @@ int8_t tsQueryUseMemoryPool = 1;
int32_t tsQueryMinConcurrentTaskNum = 1;
int32_t tsQueryMaxConcurrentTaskNum = 0;
int32_t tsQueryConcurrentTaskNum = 0;
int64_t tsSingleQueryMaxMemorySize = 0;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;

View File

@ -231,11 +231,16 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx;
} SQWorkerMgmt;
typedef struct SQWQueryInfo {
void* pCollection;
SHashObj* pSessions;
} SQWQueryInfo;
typedef struct SQueryMgmt {
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
void* memPoolHandle;
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pQueryInfo;
void* memPoolHandle;
} SQueryMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
@ -446,7 +451,7 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);
void qwDbgSimulateSleep(void);
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwInitQueryPool(void);
int32_t qwInitQueryPool(void);
#ifdef __cplusplus
}

View File

@ -4,9 +4,9 @@
int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576;
if (availSize < QW_MIN_MEM_POOL_SIZE) {
return -1;
}
//if (availSize < QW_MIN_MEM_POOL_SIZE) {
// return -1;
//}
*maxSize = availSize;
@ -34,42 +34,118 @@ void qwSetConcurrentTaskNum(int32_t taskNum) {
void qwDecConcurrentTaskNum(void) {
int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel);
if (concTaskLevel < QW_CONC_TASK_LEVEL_LOW) {
if (concTaskLevel <= QW_CONC_TASK_LEVEL_LOW) {
qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel);
return;
}
//TODO
}
void qwInitQueryPool(void) {
void qwIncConcurrentTaskNum(void) {
int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel);
if (concTaskLevel >= QW_CONC_TASK_LEVEL_FULL) {
qError("Unable to increase concurrent task num, current task level:%d", concTaskLevel);
return;
}
//TODO
}
int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) {
pQuery->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pQuery->pSessions) {
qError("fail to init session hash");
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = taosMemPoolCallocCollection(qId, &pQuery->pCollection);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pQuery->pSessions);
return code;
}
return code;
}
int32_t qwInitSession(uint64_t qId, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SQWQueryInfo* pQuery = NULL;
while (true) {
pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId));
if (NULL == pQuery) {
SQWQueryInfo queryInfo = {0};
code = qwInitQueryInfo(qId, &queryInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = taosHashPut(gQueryMgmt.pQueryInfo, &qId, sizeof(qId), &queryInfo, sizeof(queryInfo));
if (TSDB_CODE_SUCCESS != code) {
qwDestroyQueryInfo(&queryInfo);
if (-2 == code) {
code = TSDB_CODE_SUCCESS;
continue;
}
return TSDB_CODE_OUT_OF_MEMORY;
}
pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId));
}
code = taosHashPut(pQuery->pSessions, ppSession, POINTER_BYTES, NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, errno:%d", terrno);
return terrno;
}
break;
}
QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, qId, ppSession, pQuery->pCollection));
return code;
}
int32_t qwInitQueryPool(void) {
int64_t memSize = 0;
int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) {
return;
return TAOS_SYSTEM_ERROR(errno);
}
SMemPoolCfg cfg = {0};
code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize);
if (TSDB_CODE_SUCCESS != code) {
return;
return code;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.collectionQuota = &tsSingleQueryMaxMemorySize;
cfg.cb.setSessFp = qwSetConcurrentTaskNum;
cfg.cb.decSessFp = qwDecConcurrentTaskNum;
cfg.cb.incSessFp = qwIncConcurrentTaskNum;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) {
return;
return code;
}
code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle);
if (TSDB_CODE_SUCCESS != code) {
return;
return code;
}
gQueryMgmt.pQueryInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == gQueryMgmt.pQueryInfo) {
qError("init query hash failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return code;
}

View File

@ -672,3 +672,8 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
qwReleaseScheduler(QW_WRITE, mgmt);
}
}
void qwDestroyQueryInfo(SQWQueryInfo* pQuery) {
//TODO
}

View File

@ -734,7 +734,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->phase = -1;
if (NULL != gQueryMgmt.memPoolHandle) {
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, &ctx->memPoolSession));
QW_ERR_JRET(qwInitSession(qId, &ctx->memPoolSession));
}
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
@ -1297,13 +1297,16 @@ _return:
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
taosThreadOnce(&gQueryPoolInit, qwInitQueryPool);
if (NULL == gQueryMgmt.memPoolHandle) {
QW_ERR_RET(qwInitQueryPool());
}
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
if (1 == qwNum) {
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));

View File

@ -31,6 +31,9 @@ extern "C" {
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF
#define MP_RETIRE_THRESHOLD_PERCENT (0.9)
#define MP_RETIRE_UNIT_PERCENT (0.1)
// FLAGS AREA
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
@ -186,11 +189,16 @@ typedef struct SMPStatInfo {
typedef struct SMPSession {
SMPListNode list;
int64_t sessionId;
SMPCollection* pCollection;
bool needRetire;
SMPCtrlInfo ctrlInfo;
int64_t allocChunkNum;
int64_t allocChunkMemSize;
int64_t allocMemSize;
int64_t maxAllocMemSize;
int64_t reUseChunkNum;
int32_t srcChunkNum;
@ -222,12 +230,20 @@ typedef struct SMPCacheGroupInfo {
void *pIdleList;
} SMPCacheGroupInfo;
typedef struct SMPCollection {
int64_t collectionId;
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPStatInfo stat;
} SMPCollection;
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int64_t memRetireThreshold;
int64_t memRetireUnit;
int32_t maxChunkNum;
SMPCtrlInfo ctrlInfo;
@ -238,6 +254,7 @@ typedef struct SMemPool {
int64_t allocNSChunkNum;
int64_t allocNSChunkSize;
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPCacheGroupInfo chunkCache;
SMPCacheGroupInfo NSChunkCache;
@ -258,11 +275,17 @@ typedef struct SMemPool {
SMPStatInfo stat;
} SMemPool;
typedef enum EMPMemStrategy {
E_MP_STRATEGY_DIRECT = 1,
E_MP_STRATEGY_CHUNK,
} EMPMemStrategy;
typedef struct SMemPoolMgmt {
SArray* poolList;
TdThreadMutex poolMutex;
TdThread poolMgmtThread;
int32_t code;
EMPMemStrategy strategy;
SArray* poolList;
TdThreadMutex poolMutex;
TdThread poolMgmtThread;
int32_t code;
} SMemPoolMgmt;
#define MP_GET_FLAG(st, f) ((st) & (f))

View File

@ -19,9 +19,7 @@
#include "tlog.h"
#include "tutil.h"
static SArray* gMPoolList = NULL;
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
static TdThreadMutex gMPoolMutex;
threadlocal void* threadPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL;
SMemPoolMgmt gMPMgmt;
@ -64,7 +62,7 @@ int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCache
}
pGrp->nodesNum = pInfo->groupNum;
pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize);
pGrp->pNodes = taosMemCalloc(pGrp->nodesNum, pInfo->nodeSize);
if (NULL == pGrp->pNodes) {
uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
@ -124,7 +122,7 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
if (NULL == pChunk->pMemStart) {
uError("add new chunk, memory malloc %d failed", pPool->cfg.chunkSize);
uError("add new chunk, memory malloc %d failed since %s", pPool->cfg.chunkSize, strerror(errno));
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -143,7 +141,7 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi
pChunk->pMemStart = taosMemMalloc(chunkSize);
if (NULL == pChunk->pMemStart) {
uError("add new chunk, memory malloc %" PRId64 " failed", chunkSize);
uError("add new NS chunk, memory malloc %" PRId64 " failed since %s", chunkSize, strerror(errno));
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -200,6 +198,8 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pPool->memRetireThreshold = pPool->cfg.maxSize * MP_RETIRE_THRESHOLD_PERCENT;
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT
pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize;
if (pPool->maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize);
@ -293,7 +293,7 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size,
pSession->allocChunkNum++;
pSession->allocChunkMemSize += pPool->cfg.chunkSize;
pSession->allocMemSize += totalSize;
memPoolAddSessionAllocMemSize(pSession, totalSize);
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
@ -336,7 +336,7 @@ void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t siz
pSession->allocChunkNum++;
pSession->allocChunkMemSize += totalSize;
pSession->allocMemSize += totalSize;
memPoolAddSessionAllocMemSize(pSession, totalSize);
if (NULL == pSession->inUseNSChunkHead) {
pSession->inUseNSChunkHead = pChunk;
@ -350,17 +350,86 @@ _return:
return pRes;
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) {
int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
while (true) {
if (newSize <= maxAllocMemSize) {
break;
}
if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) {
break;
}
return pHeader->size;
maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
}
}
void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t size) {
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_add_fetch_64(&pSession->pCollection->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pSession->pCollection->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize);
}
bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
if (pPool->cfg.collectionQuota > 0 && (atomic_load_64(&pSession->pCollection->allocMemSize) + size) > pPool->cfg.collectionQuota) {
return true;
}
if ((atomic_load_64(&pPool->allocMemSize) + size) >= pPool->memRetireThreshold) {
return (*pPool->cfg.cb.retireFp)(pSession->pCollection->collectionId, pPool->memRetireUnit);
}
return false;
}
void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
if (memPoolMemQuotaOverflow(pPool, pSession, size)) {
uInfo("session needs to retire memory");
return NULL;
}
void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
if (NULL != res) {
memPoolUpdateAllocMemSize(pPool, pSession, size);
}
return res;
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT:
return taosMemSize(ptr);
case E_MP_STRATEGY_CHUNK: {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
}
default:
break;
}
return 0;
}
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment);
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT:
res = memPoolAllocDirect(pPool, pSession, size, alignment);
break;
case E_MP_STRATEGY_CHUNK:
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment);
break;
default:
break;
}
_return:
@ -391,8 +460,25 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t*
return;
}
if (origSize) {
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT: {
int64_t oSize = taosMemSize(ptr);
if (origSize) {
*origSize = oSize;
}
taosMemFree(ptr);
atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
break;
}
case E_MP_STRATEGY_CHUNK:
if (origSize) {
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
}
break;
default:
break;
}
return;
@ -413,21 +499,36 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64
}
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
if (*origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
pHeader->size = size;
return ptr;
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT: {
res = taosMemRealloc(ptr, size);
if (NULL != res) {
memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize);
}
return res;
}
case E_MP_STRATEGY_CHUNK: {
if (*origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
pHeader->size = size;
return ptr;
}
res = memPoolMallocImpl(pPool, pSession, size, 0);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
memcpy(res, ptr, *origSize);
memPoolFreeImpl(pPool, pSession, ptr, NULL);
return res;
}
default:
break;
}
res = memPoolMallocImpl(pPool, pSession, size, 0);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
memcpy(res, ptr, *origSize);
memPoolFreeImpl(pPool, pSession, ptr, NULL);
return res;
return NULL;
}
void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName) {
@ -637,7 +738,8 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item,
}
void* memPoolMgmtThreadFunc(void* param) {
//TODO
return NULL;
}
void taosMemPoolModInit(void) {
@ -648,6 +750,8 @@ void taosMemPoolModInit(void) {
gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY;
return;
}
gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
@ -666,7 +770,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
SMemPool* pPool = NULL;
taosThreadOnce(&gMPoolInit, taosMemPoolModInit);
if (NULL == gMPoolList) {
if (NULL == gMPMgmt.poolList) {
uError("init memory pool failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -679,12 +783,12 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
MP_ERR_JRET(memPoolInit(pPool, poolName, cfg));
taosThreadMutexLock(&gMPoolMutex);
taosThreadMutexLock(&gMPMgmt.poolMutex);
taosArrayPush(gMPoolList, &pPool);
pPool->slotId = taosArrayGetSize(gMPoolList) - 1;
taosArrayPush(gMPMgmt.poolList, &pPool);
pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1;
taosThreadMutexUnlock(&gMPoolMutex);
taosThreadMutexUnlock(&gMPMgmt.poolMutex);
_return:
@ -708,7 +812,7 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
}
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
@ -726,6 +830,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
pSession->pCollection = (SMPCollection*)pCollection;
_return:
if (TSDB_CODE_SUCCESS != code) {
@ -933,5 +1039,17 @@ void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
}
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection) {
*ppCollection = taosMemCalloc(1, sizeof(SMPCollection))
if (NULL == *ppCollection) {
uError("calloc collection failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
SMPCollection* pCollection = (SMPCollection*)*ppCollection;
pCollection->collectionId = collectionId;
return TSDB_CODE_SUCCESS;
}