enh: add concurrent task control

This commit is contained in:
dapan1121 2024-07-03 19:26:23 +08:00
parent 55ba9054ea
commit 1bcba8f604
14 changed files with 307 additions and 64 deletions

View File

@ -69,6 +69,10 @@ extern float tsSelectivityRatio;
extern int32_t tsTagFilterResCacheSize;
// queue & threads
extern int32_t tsQueryMinConcurrentTaskNum;
extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum;
extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsTimeToGetAvailableConn;

View File

@ -24,18 +24,39 @@ extern "C" {
#define MEMPOOL_MAX_CHUNK_SIZE (1 << 30)
#define MEMPOOL_MIN_CHUNK_SIZE (1 << 20)
typedef enum MemPoolEvictPolicy{
typedef enum MemPoolEvictPolicy {
E_EVICT_ALL = 1,
E_EVICT_NONE,
E_EVICT_AUTO,
E_EVICT_MAX_VALUE, // no used
} MemPoolEvictPolicy;
typedef enum MemPoolUsageLevel {
E_MEM_USAGE_LOW = 0,
E_MEM_USAGE_MIDDLE,
E_MEM_USAGE_HIGH,
E_MEM_USAGE_EXTRAME,
E_MEM_USAGE_MAX_VALUE
} MemPoolUsageLevel;
typedef void (*decConcSessionNum)(void);
typedef void (*incConcSessionNum)(void);
typedef void (*setConcSessionNum)(int32_t);
typedef struct SMemPoolCallBack {
decConcSessionNum decSessFp;
incConcSessionNum incSessFp;
setConcSessionNum setSessFp;
} SMemPoolCallBack;
typedef struct SMemPoolCfg {
int64_t maxSize;
int64_t sessionExpectSize;
int32_t chunkSize;
int32_t threadNum;
int8_t usageLevel[E_MEM_USAGE_MAX_VALUE];
MemPoolEvictPolicy evicPolicy;
SMemPoolCallBack cb;
} SMemPoolCfg;
void taosMemPoolModInit(void);
@ -52,7 +73,7 @@ void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void);
void taosAutoMemoryFree(void *ptr);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession);
void taosMemPoolDestroySession(void* session);
void taosMemPoolDestroySession(void* poolHandle, void* session);
extern threadlocal void* threadPoolHandle;

View File

@ -146,6 +146,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
#define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal
#define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x0137)
#define TSDB_CODE_SYSTEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0138)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -57,11 +57,16 @@ int32_t tsShellActivityTimer = 3; // second
int8_t tsQueryUseMemoryPool = 1;
// queue & threads
int32_t tsQueryMinConcurrentTaskNum = 1;
int32_t tsQueryMaxConcurrentTaskNum = 0;
int32_t tsQueryConcurrentTaskNum = 0;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
int32_t tsNumOfQueryThreads = 0;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 16;
int32_t tsNumOfMnodeQueryThreads = 16;

View File

@ -209,6 +209,8 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1;
}
tsNumOfQueryThreads += tsNumOfMnodeQueryThreads;
SSingleWorkerCfg fCfg = {
.min = tsNumOfMnodeFetchThreads,
.max = tsNumOfMnodeFetchThreads,

View File

@ -100,8 +100,8 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
SSingleWorkerCfg queryCfg = {
.min = tsNumOfVnodeQueryThreads,
.max = tsNumOfVnodeQueryThreads,
.min = tsNumOfQnodeQueryThreads,
.max = tsNumOfQnodeQueryThreads,
.name = "qnode-query",
.fp = (FItem)qmProcessQueue,
.param = pMgmt,
@ -112,6 +112,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1;
}
tsNumOfQueryThreads += tsNumOfQnodeQueryThreads;
SSingleWorkerCfg fetchCfg = {
.min = tsNumOfQnodeFetchThreads,
.max = tsNumOfQnodeFetchThreads,

View File

@ -399,6 +399,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool->max = tsNumOfVnodeQueryThreads;
if (tQWorkerInit(pQPool) != 0) return -1;
tsNumOfQueryThreads += tsNumOfVnodeQueryThreads;
SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
pStreamPool->name = "vnode-stream";
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;

View File

@ -44,6 +44,15 @@ extern "C" {
#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576)
#define QW_MIN_MEM_POOL_SIZE (256 * 1048576)
#define QW_DEFAULT_THREAD_TASK_NUM 3
enum {
QW_CONC_TASK_LEVEL_LOW = 1,
QW_CONC_TASK_LEVEL_MIDDLE,
QW_CONC_TASK_LEVEL_HIGH,
QW_CONC_TASK_LEVEL_FULL
};
enum {
QW_PHASE_PRE_QUERY = 1,
QW_PHASE_POST_QUERY,
@ -222,18 +231,25 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx;
} SQWorkerMgmt;
typedef struct SQueryMgmt {
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
void* memPoolHandle;
} SQueryMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
#define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS()
extern void* gQueryPoolHandle;
extern SQueryMgmt gQueryMgmt;
#define QW_SINK_ENABLE_MEMPOOL(_ctx) \
do { \
if ((_ctx)->sinkWithMemPool) { \
taosEnableMemoryPoolUsage(gQueryPoolHandle, (_ctx)->memPoolSession); \
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, (_ctx)->memPoolSession); \
} \
} while (0)

View File

@ -1,8 +1,6 @@
#include "qwInt.h"
#include "qworker.h"
void* gQueryPoolHandle = NULL;
int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576;
@ -21,6 +19,28 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu
return TSDB_CODE_SUCCESS;
}
void qwSetConcurrentTaskNum(int32_t taskNum) {
int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM);
if (tsQueryMaxConcurrentTaskNum > 0) {
finTaskNum = TMIN(taskNum, tsQueryMaxConcurrentTaskNum);
}
finTaskNum = TMAX(finTaskNum, tsQueryMinConcurrentTaskNum);
atomic_store_32(&tsQueryConcurrentTaskNum, finTaskNum);
atomic_store_32(&gQueryMgmt.concTaskLevel, QW_CONC_TASK_LEVEL_FULL);
}
void qwDecConcurrentTaskNum(void) {
int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel);
if (concTaskLevel < QW_CONC_TASK_LEVEL_LOW) {
qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel);
return;
}
}
void qwInitQueryPool(void) {
int64_t memSize = 0;
@ -37,13 +57,19 @@ void qwInitQueryPool(void) {
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.cb.setSessFp = qwSetConcurrentTaskNum;
cfg.cb.decSessFp = qwDecConcurrentTaskNum;
cfg.cb.incSessFp = qwIncConcurrentTaskNum;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) {
return;
}
taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryPoolHandle);
code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle);
if (TSDB_CODE_SUCCESS != code) {
return;
}
}

View File

@ -276,7 +276,7 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
qDestroyTask(otaskHandle);
taosDisableMemoryPoolUsage();
@ -320,6 +320,8 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
}
taosArrayDestroy(ctx->tbInfo);
taosMemPoolDestroySession(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
}
static void freeExplainExecItem(void *param) {

View File

@ -19,6 +19,8 @@ SQWorkerMgmt gQwMgmt = {
};
TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
SQueryMgmt gQueryMgmt;
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
@ -153,7 +155,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (taskHandle) {
qwDbgSimulateSleep();
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
taosDisableMemoryPoolUsage();
@ -731,8 +733,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->sId = sId;
ctx->phase = -1;
if (NULL != gQueryPoolHandle) {
QW_ERR_JRET(taosMemPoolInitSession(gQueryPoolHandle, &ctx->memPoolSession));
if (NULL != gQueryMgmt.memPoolHandle) {
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, &ctx->memPoolSession));
}
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
@ -774,7 +776,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code);
}
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableMemoryPoolUsage();

View File

@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "tlockfree.h"
#include "thash.h"
#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500
@ -52,12 +53,18 @@ extern "C" {
#define MP_STAT_FLAG_LOG_SOME_NODE_STAT (1 << 10)
#define MP_STAT_FLAG_LOG_SOME_POOL_STAT (1 << 11)
#define MP_STAT_FLAG_LOG_ALL (0xFFFFFFFFFFFFFFFF)
// STAT PROCESURE FLAGS
#define MP_STAT_PROC_FLAG_EXEC (1 << 0)
#define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1)
#define MP_STAT_PROC_FLAG_RES_SUCC (1 << 2)
#define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3)
// CTRL FUNC FLAGS
#define MP_CTRL_FLAG_PRINT_STAT (1 << 0)
typedef enum EMPStatLogItem {
E_MP_STAT_LOG_MEM_MALLOC = 1,
@ -123,23 +130,33 @@ typedef struct SMPStatInput {
} SMPStatInput;
typedef struct SMPStatItem {
int64_t exec;
int64_t inErr;
int64_t exec;
int64_t succ;
int64_t fail;
int64_t orig;
} SMPStatItem;
typedef struct SMPStatItemExt {
int64_t inErr;
int64_t exec;
int64_t succ;
int64_t fail;
int64_t origExec;
int64_t origSucc;
int64_t origFail;
} SMPStatItemExt;
typedef struct SMPMemoryStat {
SMPStatItem chunkMalloc;
SMPStatItem chunkRecycle;
SMPStatItem chunkReUse;
SMPStatItem chunkFree;
SMPStatItem memMalloc;
SMPStatItem memCalloc;
SMPStatItem memRealloc;
SMPStatItem strdup;
SMPStatItem memFree;
SMPStatItem memMalloc;
SMPStatItem memCalloc;
SMPStatItemExt memRealloc;
SMPStatItem strdup;
SMPStatItem memFree;
SMPStatItem chunkMalloc;
SMPStatItem chunkRecycle;
SMPStatItem chunkReUse;
SMPStatItem chunkFree;
} SMPMemoryStat;
typedef struct SMPStatDetail {
@ -152,11 +169,19 @@ typedef struct SMPCtrlInfo {
int64_t funcFlags;
} SMPCtrlInfo;
typedef struct SMPStatSession {
int64_t initSucc;
int64_t initFail;
int64_t destroyNum;
} SMPStatSession;
typedef struct SMPStatInfo {
SMPStatDetail statDetail;
SHashObj* nodeStat;
SHashObj* fileStat;
SHashObj* lineStat;
SMPStatDetail statDetail;
SMPStatSession statSession;
SHashObj* sessStat;
SHashObj* nodeStat;
SHashObj* fileStat;
SHashObj* lineStat;
} SMPStatInfo;
typedef struct SMPSession {
@ -233,6 +258,13 @@ typedef struct SMemPool {
SMPStatInfo stat;
} SMemPool;
typedef struct SMemPoolMgmt {
SArray* poolList;
TdThreadMutex poolMutex;
TdThread poolMgmtThread;
int32_t code;
} SMemPoolMgmt;
#define MP_GET_FLAG(st, f) ((st) & (f))
#define MP_SET_FLAG(st, f) (st) |= (f)
#define MP_CLR_FLAG(st, f) (st) &= (~f)
@ -242,6 +274,13 @@ enum {
MP_WRITE,
};
#define MP_STAT_FORMAT "%s => \tinputError:%" PRId64 "\texec:%" PRId64 "\tsucc:%" PRId64 "\tfail:%" PRId64
#define MP_STAT_ORIG_FORMAT "%s => \tinputError:%" PRId64 "\texec:%" PRId64 "\tsucc:%" PRId64 "\tfail:%" PRId64 "\torigExec:%" PRId64 "\torigSucc:%" PRId64 "\torigFail:%" PRId64
#define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail
#define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail
#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \
do { \
(_header)->size = _size; \

View File

@ -105,6 +105,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param")
TAOS_DEFINE_ERROR(TSDB_CODE_SYSTEM_ERROR, "Operating system error")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -24,7 +24,7 @@ static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
static TdThreadMutex gMPoolMutex;
threadlocal void* threadPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL;
SMemPoolMgmt gMPMgmt;
int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
@ -153,6 +153,8 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi
pPool->allocNSChunkNum++;
pPool->allocNSChunkSize += pPool->cfg.chunkSize;
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
}
@ -204,6 +206,9 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL;
pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT;
pPool->threadChunkReserveNum = 1;
pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
@ -345,6 +350,12 @@ _return:
return pRes;
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
}
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
@ -370,6 +381,23 @@ _return:
return res;
}
void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
if (NULL == ptr) {
if (origSize) {
*origSize = 0;
}
return;
}
if (origSize) {
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
}
return;
}
void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) {
void *res = NULL;
@ -402,104 +430,168 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64
return res;
}
void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
if (NULL == ptr) {
if (origSize) {
*origSize = 0;
}
void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName) {
if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
return;
}
if (origSize) {
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
}
uInfo("MemPool [%s] stat detail:", detailName);
return;
uInfo("[times]:");
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memMalloc", pDetail->times.memMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memCalloc", pDetail->times.memCalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memRealloc", pDetail->times.memRealloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memStrdup", pDetail->times.strdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memFree", pDetail->times.memFree));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->times.chunkMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->times.chunkRecycle));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->times.chunkReUse));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->times.chunkFree));
uInfo("[bytes]:");
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memMalloc", pDetail->bytes.memMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memCalloc", pDetail->bytes.memCalloc));
uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("memRealloc", pDetail->bytes.memRealloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memStrdup", pDetail->bytes.strdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memFree", pDetail->bytes.memFree));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->bytes.chunkMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->bytes.chunkRecycle));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->bytes.chunkReUse));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->bytes.chunkFree));
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
void memPoolPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
//TODO
}
void memPoolPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
//TODO
}
void memPoolPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) {
if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
return;
}
uInfo("MemPool [%s] session stat:", detailName);
uInfo("init session succeed num: %" PRId64, pSessStat->initSucc);
uInfo("init session failed num: %" PRId64, pSessStat->initFail);
uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum);
}
void memPoolPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) {
char detailName[128];
if (NULL != pSession) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
}
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName);
memPoolPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine");
detailName[sizeof(detailName) - 1] = 0;
memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
}
void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
switch (item) {
case E_MP_STAT_LOG_MEM_MALLOC: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, size);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, size);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, size);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_CALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, size);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, size);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, size);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_REALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize);
}
break;
}
case E_MP_STAT_LOG_MEM_FREE:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memFree.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.exec, size);
atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memFree.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.succ, size);
atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memFree.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.fail, size);
atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_STRDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strdup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.exec, size);
atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strdup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.succ, size);
atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strdup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.fail, size);
atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size);
}
break;
}
@ -544,11 +636,29 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item,
}
}
void* memPoolMgmtThreadFunc(void* param) {
}
void taosMemPoolModInit(void) {
taosThreadMutexInit(&gMPoolMutex, NULL);
taosThreadMutexInit(&gMPMgmt.poolMutex, NULL);
gMPoolList = taosArrayInit(10, POINTER_BYTES);
gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES);
if (NULL == gMPMgmt.poolList) {
gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY;
return;
}
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, memPoolMgmtThreadFunc, NULL) != 0) {
uError("failed to create memPool mgmt thread since %s", strerror(errno));
gMPMgmt.code = TSDB_CODE_SYSTEM_ERROR;
return;
}
taosThreadAttrDestroy(&thAttr);
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
@ -588,9 +698,14 @@ _return:
return code;
}
void taosMemPoolDestroySession(void* session) {
void taosMemPoolDestroySession(void* poolHandle, void* session) {
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
//TODO;
memPoolPrintStat(pPool, pSession, "DestroySession");
atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
}
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
@ -601,6 +716,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
memcpy(&pSession->ctrlInfo, &pPool->ctrlInfo, sizeof(pSession->ctrlInfo));
MP_ERR_JRET(memPoolGetChunk(pPool, &pChunk));
pSession->allocChunkNum = 1;
@ -612,8 +729,11 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
_return:
if (TSDB_CODE_SUCCESS != code) {
taosMemPoolDestroySession(pSession);
taosMemPoolDestroySession(poolHandle, pSession);
pSession = NULL;
atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1);
} else {
atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1);
}
*ppSession = pSession;