fix: ut issues

This commit is contained in:
dapan1121 2024-11-07 16:46:18 +08:00
parent c5c85efeb5
commit dd2ab5b361
5 changed files with 23 additions and 16 deletions

View File

@ -113,6 +113,11 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
int32_t qWorkerDbgEnableDebug(char *option);
void qWorkerRetireJob(uint64_t jobId, int32_t errCode);
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode);
#ifdef __cplusplus
}
#endif

View File

@ -25,6 +25,7 @@
#endif
#include "dmUtil.h"
#include "tcs.h"
#include "qworker.h"
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"

View File

@ -115,7 +115,7 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId);
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo));
QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo));
code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -168,7 +168,7 @@ int32_t mpUpdateCfg(SMemPool* pPool) {
}
uDebug("memPool %s cfg updated, reserveSize:%dMB, jobQuota:%dMB, threadNum:%d",
pPool->name, pPool->cfg.reserveSize, pPool->cfg.jobQuota, pPool->cfg.threadNum);
pPool->name, *pPool->cfg.reserveSize, *pPool->cfg.jobQuota, pPool->cfg.threadNum);
return TSDB_CODE_SUCCESS;
}
@ -313,7 +313,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32(pPool->cfg.reserveSize) * 1048576UL) + size)) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, *pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);

View File

@ -37,6 +37,7 @@
#include "tdef.h"
#include "tvariant.h"
#include "stub.h"
#include "../inc/tmempoolInt.h"
namespace {
@ -155,9 +156,10 @@ typedef struct {
} SMPTestJobCtx;
typedef struct {
int64_t jobQuota;
int32_t jobQuota;
bool reserveMode;
int64_t upperLimitSize;
int32_t reserveSize;
int32_t threadNum;
int32_t randTask;
} SMPTestParam;
@ -487,7 +489,7 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
return TSDB_CODE_SUCCESS;
}
void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) {
void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
uint64_t jobId = 0;
int64_t retiredSize = 0;
@ -514,37 +516,36 @@ void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) {
}
void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId));
void mptRetireJobCb(uint64_t jobId, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId));
if (NULL == pJob) {
uError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
void mptInitPool(void) {
SMemPoolCfg cfg = {0};
cfg.reserveMode = mptCtx.param.reserveMode;
if (!mptCtx.param.reserveMode) {
cfg.upperLimitSize = mptCtx.param.upperLimitSize;
//cfg.upperLimitSize = mptCtx.param.upperLimitSize;
} else {
int64_t memSize = 0;
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
cfg.reserveSize = memSize / 1048576UL * MP_DEFAULT_RESERVE_MEM_PERCENT / 100;
cfg.reserveSize = &mptCtx.param.reserveSize;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = mptCtx.param.jobQuota;
cfg.cb.retireJobsFp = mptRetireJobsCb;
cfg.cb.retireJobFp = mptRetireJobCb;
cfg.jobQuota = &mptCtx.param.jobQuota;
cfg.cb.failFp = mptRetireJobsCb;
cfg.cb.reachFp = mptRetireJobCb;
ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
}