diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 96b9617fc4..f0a35b7423 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -69,6 +69,10 @@ extern float tsSelectivityRatio; extern int32_t tsTagFilterResCacheSize; // queue & threads +extern int32_t tsQueryMinConcurrentTaskNum; +extern int32_t tsQueryMaxConcurrentTaskNum; +extern int32_t tsQueryConcurrentTaskNum; +extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; extern int32_t tsTimeToGetAvailableConn; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index bd49ded309..2af0d46c19 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -24,18 +24,39 @@ extern "C" { #define MEMPOOL_MAX_CHUNK_SIZE (1 << 30) #define MEMPOOL_MIN_CHUNK_SIZE (1 << 20) -typedef enum MemPoolEvictPolicy{ +typedef enum MemPoolEvictPolicy { E_EVICT_ALL = 1, E_EVICT_NONE, E_EVICT_AUTO, E_EVICT_MAX_VALUE, // no used } MemPoolEvictPolicy; +typedef enum MemPoolUsageLevel { + E_MEM_USAGE_LOW = 0, + E_MEM_USAGE_MIDDLE, + E_MEM_USAGE_HIGH, + E_MEM_USAGE_EXTRAME, + E_MEM_USAGE_MAX_VALUE +} MemPoolUsageLevel; + +typedef void (*decConcSessionNum)(void); +typedef void (*incConcSessionNum)(void); +typedef void (*setConcSessionNum)(int32_t); + +typedef struct SMemPoolCallBack { + decConcSessionNum decSessFp; + incConcSessionNum incSessFp; + setConcSessionNum setSessFp; +} SMemPoolCallBack; + typedef struct SMemPoolCfg { int64_t maxSize; + int64_t sessionExpectSize; int32_t chunkSize; int32_t threadNum; + int8_t usageLevel[E_MEM_USAGE_MAX_VALUE]; MemPoolEvictPolicy evicPolicy; + SMemPoolCallBack cb; } SMemPoolCfg; void taosMemPoolModInit(void); @@ -52,7 +73,7 @@ void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); void taosAutoMemoryFree(void *ptr); int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession); -void taosMemPoolDestroySession(void* session); +void taosMemPoolDestroySession(void* poolHandle, void* session); extern threadlocal void* threadPoolHandle; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ba49ef929e..9d020f98ac 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -146,6 +146,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135) #define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal #define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x0137) +#define TSDB_CODE_SYSTEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0138) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 110fb23b87..45a1a64393 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -57,11 +57,16 @@ int32_t tsShellActivityTimer = 3; // second int8_t tsQueryUseMemoryPool = 1; // queue & threads +int32_t tsQueryMinConcurrentTaskNum = 1; +int32_t tsQueryMaxConcurrentTaskNum = 0; +int32_t tsQueryConcurrentTaskNum = 0; + int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; +int32_t tsNumOfQueryThreads = 0; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 16; int32_t tsNumOfMnodeQueryThreads = 16; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 885086e37a..e77b670d6e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -209,6 +209,8 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } + tsNumOfQueryThreads += tsNumOfMnodeQueryThreads; + SSingleWorkerCfg fCfg = { .min = tsNumOfMnodeFetchThreads, .max = tsNumOfMnodeFetchThreads, diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 28da0f9c5f..1c7898ec20 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -100,8 +100,8 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t qmStartWorker(SQnodeMgmt *pMgmt) { SSingleWorkerCfg queryCfg = { - .min = tsNumOfVnodeQueryThreads, - .max = tsNumOfVnodeQueryThreads, + .min = tsNumOfQnodeQueryThreads, + .max = tsNumOfQnodeQueryThreads, .name = "qnode-query", .fp = (FItem)qmProcessQueue, .param = pMgmt, @@ -112,6 +112,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } + tsNumOfQueryThreads += tsNumOfQnodeQueryThreads; + SSingleWorkerCfg fetchCfg = { .min = tsNumOfQnodeFetchThreads, .max = tsNumOfQnodeFetchThreads, diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7b7c51bbc7..b882df6ba3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -399,6 +399,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pQPool->max = tsNumOfVnodeQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; + tsNumOfQueryThreads += tsNumOfVnodeQueryThreads; + SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; pStreamPool->ratio = tsRatioOfVnodeStreamThreads; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index d4832ade40..b28254d91d 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -44,6 +44,15 @@ extern "C" { #define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576) #define QW_MIN_MEM_POOL_SIZE (256 * 1048576) +#define QW_DEFAULT_THREAD_TASK_NUM 3 + +enum { + QW_CONC_TASK_LEVEL_LOW = 1, + QW_CONC_TASK_LEVEL_MIDDLE, + QW_CONC_TASK_LEVEL_HIGH, + QW_CONC_TASK_LEVEL_FULL +}; + enum { QW_PHASE_PRE_QUERY = 1, QW_PHASE_POST_QUERY, @@ -222,18 +231,25 @@ typedef struct SQWorkerMgmt { int32_t paramIdx; } SQWorkerMgmt; +typedef struct SQueryMgmt { + SRWLatch taskMgmtLock; + int32_t concTaskLevel; + + void* memPoolHandle; +} SQueryMgmt; + #define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId #define QW_IDS() sId, qId, tId, rId, eId #define QW_FPARAMS() mgmt, QW_IDS() -extern void* gQueryPoolHandle; +extern SQueryMgmt gQueryMgmt; #define QW_SINK_ENABLE_MEMPOOL(_ctx) \ do { \ if ((_ctx)->sinkWithMemPool) { \ - taosEnableMemoryPoolUsage(gQueryPoolHandle, (_ctx)->memPoolSession); \ + taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, (_ctx)->memPoolSession); \ } \ } while (0) diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index adb5bc363e..932fb7b1fc 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -1,8 +1,6 @@ #include "qwInt.h" #include "qworker.h" -void* gQueryPoolHandle = NULL; - int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE); int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576; @@ -21,6 +19,28 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu return TSDB_CODE_SUCCESS; } +void qwSetConcurrentTaskNum(int32_t taskNum) { + int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM); + + if (tsQueryMaxConcurrentTaskNum > 0) { + finTaskNum = TMIN(taskNum, tsQueryMaxConcurrentTaskNum); + } + finTaskNum = TMAX(finTaskNum, tsQueryMinConcurrentTaskNum); + + atomic_store_32(&tsQueryConcurrentTaskNum, finTaskNum); + + atomic_store_32(&gQueryMgmt.concTaskLevel, QW_CONC_TASK_LEVEL_FULL); +} + +void qwDecConcurrentTaskNum(void) { + int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel); + if (concTaskLevel < QW_CONC_TASK_LEVEL_LOW) { + qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel); + return; + } + + +} void qwInitQueryPool(void) { int64_t memSize = 0; @@ -37,13 +57,19 @@ void qwInitQueryPool(void) { cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO + cfg.cb.setSessFp = qwSetConcurrentTaskNum; + cfg.cb.decSessFp = qwDecConcurrentTaskNum; + cfg.cb.incSessFp = qwIncConcurrentTaskNum; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); if (TSDB_CODE_SUCCESS != code) { return; } - taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryPoolHandle); + code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle); + if (TSDB_CODE_SUCCESS != code) { + return; + } } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 5d81a37156..da69bcf95d 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -276,7 +276,7 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); qDestroyTask(otaskHandle); taosDisableMemoryPoolUsage(); @@ -320,6 +320,8 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { } taosArrayDestroy(ctx->tbInfo); + + taosMemPoolDestroySession(gQueryMgmt.memPoolHandle, ctx->memPoolSession); } static void freeExplainExecItem(void *param) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index dc9795023c..a7a2e7f6aa 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -19,6 +19,8 @@ SQWorkerMgmt gQwMgmt = { }; TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; +SQueryMgmt gQueryMgmt; + int32_t qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, tId, sId; @@ -153,7 +155,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (taskHandle) { qwDbgSimulateSleep(); - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); taosDisableMemoryPoolUsage(); @@ -731,8 +733,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->sId = sId; ctx->phase = -1; - if (NULL != gQueryPoolHandle) { - QW_ERR_JRET(taosMemPoolInitSession(gQueryPoolHandle, &ctx->memPoolSession)); + if (NULL != gQueryMgmt.memPoolHandle) { + QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, &ctx->memPoolSession)); } QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); @@ -774,7 +776,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } - taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); + taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); taosDisableMemoryPoolUsage(); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 9ce6e594e9..d55fc819fd 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -22,6 +22,7 @@ extern "C" { #include "os.h" #include "tlockfree.h" +#include "thash.h" #define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 #define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 @@ -52,12 +53,18 @@ extern "C" { #define MP_STAT_FLAG_LOG_SOME_NODE_STAT (1 << 10) #define MP_STAT_FLAG_LOG_SOME_POOL_STAT (1 << 11) +#define MP_STAT_FLAG_LOG_ALL (0xFFFFFFFFFFFFFFFF) + + // STAT PROCESURE FLAGS #define MP_STAT_PROC_FLAG_EXEC (1 << 0) #define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1) #define MP_STAT_PROC_FLAG_RES_SUCC (1 << 2) #define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3) +// CTRL FUNC FLAGS +#define MP_CTRL_FLAG_PRINT_STAT (1 << 0) + typedef enum EMPStatLogItem { E_MP_STAT_LOG_MEM_MALLOC = 1, @@ -123,23 +130,33 @@ typedef struct SMPStatInput { } SMPStatInput; typedef struct SMPStatItem { - int64_t exec; int64_t inErr; + int64_t exec; int64_t succ; int64_t fail; - int64_t orig; } SMPStatItem; +typedef struct SMPStatItemExt { + int64_t inErr; + int64_t exec; + int64_t succ; + int64_t fail; + int64_t origExec; + int64_t origSucc; + int64_t origFail; +} SMPStatItemExt; + typedef struct SMPMemoryStat { - SMPStatItem chunkMalloc; - SMPStatItem chunkRecycle; - SMPStatItem chunkReUse; - SMPStatItem chunkFree; - SMPStatItem memMalloc; - SMPStatItem memCalloc; - SMPStatItem memRealloc; - SMPStatItem strdup; - SMPStatItem memFree; + SMPStatItem memMalloc; + SMPStatItem memCalloc; + SMPStatItemExt memRealloc; + SMPStatItem strdup; + SMPStatItem memFree; + + SMPStatItem chunkMalloc; + SMPStatItem chunkRecycle; + SMPStatItem chunkReUse; + SMPStatItem chunkFree; } SMPMemoryStat; typedef struct SMPStatDetail { @@ -152,11 +169,19 @@ typedef struct SMPCtrlInfo { int64_t funcFlags; } SMPCtrlInfo; +typedef struct SMPStatSession { + int64_t initSucc; + int64_t initFail; + int64_t destroyNum; +} SMPStatSession; + typedef struct SMPStatInfo { - SMPStatDetail statDetail; - SHashObj* nodeStat; - SHashObj* fileStat; - SHashObj* lineStat; + SMPStatDetail statDetail; + SMPStatSession statSession; + SHashObj* sessStat; + SHashObj* nodeStat; + SHashObj* fileStat; + SHashObj* lineStat; } SMPStatInfo; typedef struct SMPSession { @@ -233,6 +258,13 @@ typedef struct SMemPool { SMPStatInfo stat; } SMemPool; +typedef struct SMemPoolMgmt { + SArray* poolList; + TdThreadMutex poolMutex; + TdThread poolMgmtThread; + int32_t code; +} SMemPoolMgmt; + #define MP_GET_FLAG(st, f) ((st) & (f)) #define MP_SET_FLAG(st, f) (st) |= (f) #define MP_CLR_FLAG(st, f) (st) &= (~f) @@ -242,6 +274,13 @@ enum { MP_WRITE, }; +#define MP_STAT_FORMAT "%s => \tinputError:%" PRId64 "\texec:%" PRId64 "\tsucc:%" PRId64 "\tfail:%" PRId64 +#define MP_STAT_ORIG_FORMAT "%s => \tinputError:%" PRId64 "\texec:%" PRId64 "\tsucc:%" PRId64 "\tfail:%" PRId64 "\torigExec:%" PRId64 "\torigSucc:%" PRId64 "\torigFail:%" PRId64 + +#define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail +#define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail + + #define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \ do { \ (_header)->size = _size; \ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b9edcf8680..bbc44dd12c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -105,6 +105,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param") +TAOS_DEFINE_ERROR(TSDB_CODE_SYSTEM_ERROR, "Operating system error") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 3f84747992..04e48e86dc 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -24,7 +24,7 @@ static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; static TdThreadMutex gMPoolMutex; threadlocal void* threadPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; - +SMemPoolMgmt gMPMgmt; int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { @@ -153,6 +153,8 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi pPool->allocNSChunkNum++; pPool->allocNSChunkSize += pPool->cfg.chunkSize; + *ppChunk = pChunk; + return TSDB_CODE_SUCCESS; } @@ -204,6 +206,9 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { return TSDB_CODE_INVALID_MEM_POOL_PARAM; } + pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL; + pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT; + pPool->threadChunkReserveNum = 1; pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); @@ -345,6 +350,12 @@ _return: return pRes; } +int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { + SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; + + return pHeader->size; +} + void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; @@ -370,6 +381,23 @@ _return: return res; } + +void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { + if (NULL == ptr) { + if (origSize) { + *origSize = 0; + } + + return; + } + + if (origSize) { + *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + } + + return; +} + void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) { void *res = NULL; @@ -402,104 +430,168 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64 return res; } - -void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { - if (NULL == ptr) { - if (origSize) { - *origSize = 0; - } - +void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName) { + if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { return; } - if (origSize) { - *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); - } + uInfo("MemPool [%s] stat detail:", detailName); - return; + uInfo("[times]:"); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memMalloc", pDetail->times.memMalloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memCalloc", pDetail->times.memCalloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memRealloc", pDetail->times.memRealloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memStrdup", pDetail->times.strdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memFree", pDetail->times.memFree)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->times.chunkMalloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->times.chunkRecycle)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->times.chunkReUse)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->times.chunkFree)); + + uInfo("[bytes]:"); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memMalloc", pDetail->bytes.memMalloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memCalloc", pDetail->bytes.memCalloc)); + uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("memRealloc", pDetail->bytes.memRealloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memStrdup", pDetail->bytes.strdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memFree", pDetail->bytes.memFree)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->bytes.chunkMalloc)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->bytes.chunkRecycle)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->bytes.chunkReUse)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->bytes.chunkFree)); + } -int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { - SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; - - return pHeader->size; +void memPoolPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { + //TODO } +void memPoolPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { + //TODO +} + +void memPoolPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) { + if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { + return; + } + + uInfo("MemPool [%s] session stat:", detailName); + uInfo("init session succeed num: %" PRId64, pSessStat->initSucc); + uInfo("init session failed num: %" PRId64, pSessStat->initFail); + uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum); +} + +void memPoolPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) { + char detailName[128]; + + if (NULL != pSession) { + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); + } + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName); + memPoolPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine"); + detailName[sizeof(detailName) - 1] = 0; + memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); +} void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) { switch (item) { case E_MP_STAT_LOG_MEM_MALLOC: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, size); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, size); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, size); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size); } break; } case E_MP_STAT_LOG_MEM_CALLOC:{ if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, size); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, size); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, size); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size); } break; } case E_MP_STAT_LOG_MEM_REALLOC:{ if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize); } break; } case E_MP_STAT_LOG_MEM_FREE:{ if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.memFree.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.exec, size); + atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.memFree.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.succ, size); + atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { atomic_add_fetch_64(&pDetail->times.memFree.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.fail, size); + atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size); } break; } case E_MP_STAT_LOG_MEM_STRDUP: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.strdup.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.exec, size); + atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.strdup.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.succ, size); + atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { atomic_add_fetch_64(&pDetail->times.strdup.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.fail, size); + atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size); } break; } @@ -544,11 +636,29 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, } } +void* memPoolMgmtThreadFunc(void* param) { + +} void taosMemPoolModInit(void) { - taosThreadMutexInit(&gMPoolMutex, NULL); + taosThreadMutexInit(&gMPMgmt.poolMutex, NULL); - gMPoolList = taosArrayInit(10, POINTER_BYTES); + gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES); + if (NULL == gMPMgmt.poolList) { + gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY; + return; + } + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, memPoolMgmtThreadFunc, NULL) != 0) { + uError("failed to create memPool mgmt thread since %s", strerror(errno)); + gMPMgmt.code = TSDB_CODE_SYSTEM_ERROR; + return; + } + + taosThreadAttrDestroy(&thAttr); } int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { @@ -588,9 +698,14 @@ _return: return code; } -void taosMemPoolDestroySession(void* session) { +void taosMemPoolDestroySession(void* poolHandle, void* session) { + SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; //TODO; + + memPoolPrintStat(pPool, pSession, "DestroySession"); + + atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); } int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { @@ -601,6 +716,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); + memcpy(&pSession->ctrlInfo, &pPool->ctrlInfo, sizeof(pSession->ctrlInfo)); + MP_ERR_JRET(memPoolGetChunk(pPool, &pChunk)); pSession->allocChunkNum = 1; @@ -612,8 +729,11 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { _return: if (TSDB_CODE_SUCCESS != code) { - taosMemPoolDestroySession(pSession); + taosMemPoolDestroySession(poolHandle, pSession); pSession = NULL; + atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1); + } else { + atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1); } *ppSession = pSession;