fix: job retry issues

This commit is contained in:
dapan1121 2024-12-10 17:27:02 +08:00
parent 7fd0d024fc
commit 14b9979325
13 changed files with 79 additions and 50 deletions

View File

@ -283,6 +283,9 @@ This document details the server error codes that may be encountered when using
| 0x80000729 | Task message error | Query message error | Preserve the scene and logs, report issue on GitHub |
| 0x8000072B | Task status error | Subquery status error | Preserve the scene and logs, report issue on GitHub |
| 0x8000072F | Job not exist | Query JOB no longer exists | Preserve the scene and logs, report issue on GitHub |
| 0x80000739 | Query memory upper limit is reached | Single query memory upper limit is reached | Modify memory upper limit size or optimize SQL |
| 0x8000073A | Query memory exhausted | Query memory in dnode is exhausted | Limit concurrent queries or add more physical memory |
| 0x8000073B | Timeout for long time no fetch | Query without fetch for a long time | Correct application to fetch data asap |
## grant

View File

@ -294,6 +294,9 @@ description: TDengine 服务端的错误码列表和详细说明
| 0x80000729 | Task message error | 查询消息错误 | 保留现场和日志github上报issue |
| 0x8000072B | Task status error | 子查询状态错误 | 保留现场和日志github上报issue |
| 0x8000072F | Job not exist | 查询JOB已经不存在 | 保留现场和日志github上报issue |
| 0x80000739 | Query memory upper limit is reached | 单个查询达到内存使用上限 | 设置合理的内存上限或调整 SQL 语句 |
| 0x8000073A | Query memory exhausted | dnode查询内存到达使用上限 | 设置合理的内存上限或调整并发查询量或增大系统内存 |
| 0x8000073B | Timeout for long time no fetch | 查询被长时间中断未恢复 | 调整应用实现尽快 fetch 数据 |
## grant

View File

@ -159,8 +159,6 @@ int32_t taosGetErrSize();
#define TSDB_CODE_SOCKET_ERROR TAOS_DEF_ERROR_CODE(0, 0x0139)
#define TSDB_CODE_UNSUPPORT_OS TAOS_DEF_ERROR_CODE(0, 0x013A)
#define TSDB_CODE_TIME_ERROR TAOS_DEF_ERROR_CODE(0, 0x013B)
#define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x013C)
#define TSDB_CODE_SYSTEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x013D)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
@ -646,8 +644,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_QRY_TASK_SUCC_TO_PARTSUSS TAOS_DEF_ERROR_CODE(0, 0x0738)
#define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0739)
#define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x073A)
#define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x073B)
#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073C)
#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073B)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)

View File

@ -547,7 +547,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
QW_TASK_ELOG_E("query already end");
QW_TASK_ELOG("query already end, phase:%d", phase);
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
}

View File

@ -163,6 +163,7 @@ typedef struct SSchCallbackParamHeader {
typedef struct SSchTaskCallbackParam {
SSchCallbackParamHeader head;
uint64_t queryId;
uint64_t seriousId;
int64_t refId;
uint64_t clientId;
uint64_t taskId;
@ -228,6 +229,7 @@ typedef struct SSchTask {
uint64_t clientId; // current client id
uint64_t taskId; // task id
uint64_t seriousId;
uint64_t failedSeriousId;
SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times
@ -627,7 +629,7 @@ void schCloseJobRef(void);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
int32_t schDumpEpSet(SEpSet *pEpSet, char **ppRes);
char *schGetOpStr(SCH_OP_TYPE type);

View File

@ -345,7 +345,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob->levelNum = levelNum;
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
SCH_JOB_DLOG("job seriousId set to 0x%" PRIx64, pJob->seriousId);
SSchLevel level = {0};
SNodeListNode *plans = NULL;
@ -1021,6 +1023,8 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
atomic_add_fetch_64(&pJob->seriousId, 1);
int32_t code = 0;
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
@ -1047,17 +1051,17 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo
SCH_UNLOCK_TASK(pTask);
SCH_RET(code);
}
schResetTaskForRetry(pJob, pTask);
SCH_LOCK(SCH_WRITE, &pTask->planLock);
qClearSubplanExecutionNode(pTask->plan);
SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
schResetTaskForRetry(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
}
}
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
SCH_JOB_DLOG("update job sId to %" PRId64, pJob->seriousId);

View File

@ -142,7 +142,7 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgSize = pMsg->len;
int32_t msgType = pMsg->msgType;
@ -444,17 +444,22 @@ _return:
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t seriousId, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, seriousId, execId));
}
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) {
SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
#if 0
if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
@ -470,7 +475,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
pTask->redirectCtx.inRedirect = false;
SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode));
SCH_RET(schProcessResponseMsg(pJob, pTask, pMsg, rspCode));
_return:
@ -488,7 +493,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
tstrerror(rspCode));
SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode);
code = schHandleResponseMsg(pJob, pTask, pParam->seriousId, pParam->execId, pMsg, rspCode);
pMsg->pData = NULL;
schProcessOnCbEnd(pJob, pTask, code);
@ -506,8 +511,8 @@ _return:
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId,
pParam->clientId, pParam->taskId, code);
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x",
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
// called if drop task rsp received code
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
@ -523,8 +528,8 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId,
pParam->clientId, pParam->taskId, code);
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x",
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
@ -600,6 +605,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
}
param->queryId = pJob->queryId;
param->seriousId = pTask->seriousId;
param->refId = pJob->refId;
param->clientId = SCH_CLIENT_ID(pTask);
param->taskId = SCH_TASK_ID(pTask);

View File

@ -63,8 +63,10 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->seriousId = pJob->seriousId;
pTask->execId = -1;
pTask->failedExecId = -2;
pTask->failedSeriousId = 0;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pTask->clientId = getClientId();
pTask->taskId = schGenTaskId();
@ -161,16 +163,18 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId) {
if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
}
SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
if ((seriousId != pTask->seriousId || seriousId <= pTask->failedSeriousId) ||
(execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since seriousId:0x%" PRIx64 " or execId:%d is not lastest,"
"current seriousId:0x%" PRIx64 " execId %d, failedSeriousId:0x%" PRIx64 " failedExecId:%d, waitRetry %d",
seriousId, execId, pTask->seriousId, pTask->execId, pTask->failedSeriousId, pTask->failedExecId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
@ -185,6 +189,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
pTask->failedExecId = pTask->execId;
pTask->failedSeriousId = pTask->seriousId;
int8_t jobStatus = 0;
if (schJobNeedToStop(pJob, &jobStatus)) {
@ -1109,7 +1114,10 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
SCH_LOCK(SCH_WRITE, &pTask->planLock);
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
SCH_UNLOCK(SCH_WRITE, &pTask->planLock);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
@ -1190,14 +1198,21 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_LOCK_TASK(pTask);
}
int8_t status = 0;
int32_t code = 0;
(void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
pTask->retryTimes++;
pTask->waitRetry = false;
int8_t status = 0;
int32_t code = 0;
if (atomic_load_64(&pTask->seriousId) < atomic_load_64(&pJob->seriousId)) {
SCH_TASK_DLOG("task seriousId:0x%" PRIx64 " is smaller than job seriousId:0x%" PRIx64 ", skip launch",
pTask->seriousId, pJob->seriousId);
goto _return;
}
(void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
@ -1353,6 +1368,8 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
SSchTask *pTask = taosArrayGet(level->subTasks, i);
pTask->seriousId = pJob->seriousId;
SCH_TASK_DLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
}

View File

@ -54,7 +54,7 @@
namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg,
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t sId, int32_t execId, SDataBuf *pMsg,
int32_t rspCode);
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
@ -591,7 +591,7 @@ void *schtSendRsp(void *param) {
msg.msgType = TDMT_VND_SUBMIT_RSP;
msg.pData = rmsg;
(void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
(void)schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
@ -621,7 +621,7 @@ void *schtCreateFetchRspThread(void *param) {
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->seriousId, pJob->fetchTask->execId, &msg, 0);
(void)schReleaseJob(job);
@ -925,7 +925,7 @@ TEST(queryTest, normalCase) {
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -941,7 +941,7 @@ TEST(queryTest, normalCase) {
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}
@ -1040,7 +1040,7 @@ TEST(queryTest, readyFirstCase) {
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -1057,7 +1057,7 @@ TEST(queryTest, readyFirstCase) {
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}
@ -1163,7 +1163,7 @@ TEST(queryTest, flowCtrlCase) {
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}

View File

@ -819,7 +819,7 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) {
}
if (0 == line[0]) {
return TSDB_CODE_SYSTEM_ERROR;
return TSDB_CODE_UNSUPPORT_OS;
}
char tmp[32];

View File

@ -116,8 +116,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_BUFFER, "Out of buffer")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param")
TAOS_DEFINE_ERROR(TSDB_CODE_SYSTEM_ERROR, "Operating system error")
TAOS_DEFINE_ERROR(TSDB_CODE_INTERNAL_ERROR, "Internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_ERROR, "Internal error in time")
@ -517,7 +515,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_RANGE_ERROR, "Wrong filter range")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_INVALID_TYPE, "Invalid filter type")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_FETCH_TIMEOUT, "Timeout for long time no fetch")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_SUCC_TO_PARTSUSS, "Change task status from success to partial success")

View File

@ -37,17 +37,17 @@ SMPStrategyFp gMPFps[] = {
int32_t mpCheckCfg(SMemPoolCfg* cfg) {
if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
uError("invalid memory pool chunkSize:%d", cfg->chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
return TSDB_CODE_INVALID_PARA;
}
if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) {
uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
return TSDB_CODE_INVALID_PARA;
}
if (cfg->threadNum <= 0) {
uError("invalid memory pool threadNum:%d", cfg->threadNum);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
@ -1294,7 +1294,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1326,7 +1326,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, num, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1357,7 +1357,7 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1409,7 +1409,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char*
if (NULL == poolHandle || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNCTION__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1446,7 +1446,7 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64
if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, ptr, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1506,7 +1506,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
if (NULL == poolHandle || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_PARA);
}
if (NULL == ptr) {
@ -1530,7 +1530,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, alignment, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1589,7 +1589,7 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d",
__FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_PARA);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1676,7 +1676,7 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_PARA);
}
SMPSession* pSession = (SMPSession*)session;

View File

@ -738,7 +738,7 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
if (availSize < MPT_MIN_MEM_POOL_SIZE) {
uError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize);
return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
//return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
}
uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize);