From 14b9979325f01884d44241480da42a3e13f1e42f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 10 Dec 2024 17:27:02 +0800 Subject: [PATCH] fix: job retry issues --- docs/en/14-reference/09-error-code.md | 3 ++ docs/zh/14-reference/09-error-code.md | 3 ++ include/util/taoserror.h | 5 +-- source/libs/qworker/src/qworker.c | 2 +- source/libs/scheduler/inc/schInt.h | 4 ++- source/libs/scheduler/src/schJob.c | 8 +++-- source/libs/scheduler/src/schRemote.c | 24 +++++++++----- source/libs/scheduler/src/schTask.c | 33 ++++++++++++++----- source/libs/scheduler/test/schedulerTests.cpp | 16 ++++----- source/os/src/osSysinfo.c | 2 +- source/util/src/terror.c | 3 -- source/util/src/tmempool.c | 24 +++++++------- source/util/test/memPoolTest.cpp | 2 +- 13 files changed, 79 insertions(+), 50 deletions(-) diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 84504395b5..5674e1f620 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -283,6 +283,9 @@ This document details the server error codes that may be encountered when using | 0x80000729 | Task message error | Query message error | Preserve the scene and logs, report issue on GitHub | | 0x8000072B | Task status error | Subquery status error | Preserve the scene and logs, report issue on GitHub | | 0x8000072F | Job not exist | Query JOB no longer exists | Preserve the scene and logs, report issue on GitHub | +| 0x80000739 | Query memory upper limit is reached | Single query memory upper limit is reached | Modify memory upper limit size or optimize SQL | +| 0x8000073A | Query memory exhausted | Query memory in dnode is exhausted | Limit concurrent queries or add more physical memory | +| 0x8000073B | Timeout for long time no fetch | Query without fetch for a long time | Correct application to fetch data asap | ## grant diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 00e6a72b6f..bc432763f8 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -294,6 +294,9 @@ description: TDengine 服务端的错误码列表和详细说明 | 0x80000729 | Task message error | 查询消息错误 | 保留现场和日志,github上报issue | | 0x8000072B | Task status error | 子查询状态错误 | 保留现场和日志,github上报issue | | 0x8000072F | Job not exist | 查询JOB已经不存在 | 保留现场和日志,github上报issue | +| 0x80000739 | Query memory upper limit is reached | 单个查询达到内存使用上限 | 设置合理的内存上限或调整 SQL 语句 | +| 0x8000073A | Query memory exhausted | dnode查询内存到达使用上限 | 设置合理的内存上限或调整并发查询量或增大系统内存 | +| 0x8000073B | Timeout for long time no fetch | 查询被长时间中断未恢复 | 调整应用实现尽快 fetch 数据 | ## grant diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 14984f113d..31d1e3b6f0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -159,8 +159,6 @@ int32_t taosGetErrSize(); #define TSDB_CODE_SOCKET_ERROR TAOS_DEF_ERROR_CODE(0, 0x0139) #define TSDB_CODE_UNSUPPORT_OS TAOS_DEF_ERROR_CODE(0, 0x013A) #define TSDB_CODE_TIME_ERROR TAOS_DEF_ERROR_CODE(0, 0x013B) -#define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x013C) -#define TSDB_CODE_SYSTEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x013D) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) @@ -646,8 +644,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_QRY_TASK_SUCC_TO_PARTSUSS TAOS_DEF_ERROR_CODE(0, 0x0738) #define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0739) #define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x073A) -#define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x073B) -#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073C) +#define TSDB_CODE_QRY_NO_FETCH_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x073B) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 023ce651c7..6449132fc4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -547,7 +547,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { - QW_TASK_ELOG_E("query already end"); + QW_TASK_ELOG("query already end, phase:%d", phase); QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 64d4fe384b..607f43a06f 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -163,6 +163,7 @@ typedef struct SSchCallbackParamHeader { typedef struct SSchTaskCallbackParam { SSchCallbackParamHeader head; uint64_t queryId; + uint64_t seriousId; int64_t refId; uint64_t clientId; uint64_t taskId; @@ -228,6 +229,7 @@ typedef struct SSchTask { uint64_t clientId; // current client id uint64_t taskId; // task id uint64_t seriousId; + uint64_t failedSeriousId; SRWLatch lock; // task reentrant lock int32_t maxExecTimes; // task max exec times int32_t maxRetryTimes; // task max retry times @@ -627,7 +629,7 @@ void schCloseJobRef(void); int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schJobFetchRows(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob); -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList); int32_t schDumpEpSet(SEpSet *pEpSet, char **ppRes); char *schGetOpStr(SCH_OP_TYPE type); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 588af5f16e..25052d2c15 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -345,7 +345,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pJob->levelNum = levelNum; SCH_RESET_JOB_LEVEL_IDX(pJob); + atomic_add_fetch_64(&pJob->seriousId, 1); + SCH_JOB_DLOG("job seriousId set to 0x%" PRIx64, pJob->seriousId); SSchLevel level = {0}; SNodeListNode *plans = NULL; @@ -1021,6 +1023,8 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode)); + atomic_add_fetch_64(&pJob->seriousId, 1); + int32_t code = 0; int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { @@ -1047,17 +1051,17 @@ int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, boo SCH_UNLOCK_TASK(pTask); SCH_RET(code); } + schResetTaskForRetry(pJob, pTask); + SCH_LOCK(SCH_WRITE, &pTask->planLock); qClearSubplanExecutionNode(pTask->plan); SCH_UNLOCK(SCH_WRITE, &pTask->planLock); - schResetTaskForRetry(pJob, pTask); SCH_UNLOCK_TASK(pTask); } } SCH_RESET_JOB_LEVEL_IDX(pJob); - atomic_add_fetch_64(&pJob->seriousId, 1); SCH_JOB_DLOG("update job sId to %" PRId64, pJob->seriousId); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index c5d8b0fde5..fd255f53cf 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -142,7 +142,7 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { return TSDB_CODE_SUCCESS; } -int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { +int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; @@ -444,17 +444,22 @@ _return: // Note: no more task error processing, handled in function internal -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t seriousId, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; int32_t msgType = pMsg->msgType; bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, seriousId, execId)); } SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); + if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) { + SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId); + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); + } + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); #if 0 if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { @@ -470,7 +475,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa pTask->redirectCtx.inRedirect = false; - SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode)); + SCH_RET(schProcessResponseMsg(pJob, pTask, pMsg, rspCode)); _return: @@ -488,7 +493,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { tstrerror(rspCode)); SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); - code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode); + code = schHandleResponseMsg(pJob, pTask, pParam->seriousId, pParam->execId, pMsg, rspCode); pMsg->pData = NULL; schProcessOnCbEnd(pJob, pTask, code); @@ -506,8 +511,8 @@ _return: int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, - pParam->clientId, pParam->taskId, code); + qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", + pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code); // called if drop task rsp received code (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error @@ -523,8 +528,8 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; - qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, - pParam->clientId, pParam->taskId, code); + qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", + pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code); if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -600,6 +605,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo } param->queryId = pJob->queryId; + param->seriousId = pTask->seriousId; param->refId = pJob->refId; param->clientId = SCH_CLIENT_ID(pTask); param->taskId = SCH_TASK_ID(pTask); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 6c19667f98..6c84c50edb 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -63,8 +63,10 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->plan = pPlan; pTask->level = pLevel; + pTask->seriousId = pJob->seriousId; pTask->execId = -1; pTask->failedExecId = -2; + pTask->failedSeriousId = 0; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->clientId = getClientId(); pTask->taskId = schGenTaskId(); @@ -161,16 +163,18 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) { +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, uint64_t seriousId, int32_t execId) { if (dropExecNode) { SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId)); } SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId)); - if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it - SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, - pTask->execId, pTask->waitRetry); + if ((seriousId != pTask->seriousId || seriousId <= pTask->failedSeriousId) || + (execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it + SCH_TASK_DLOG("handle not updated since seriousId:0x%" PRIx64 " or execId:%d is not lastest," + "current seriousId:0x%" PRIx64 " execId %d, failedSeriousId:0x%" PRIx64 " failedExecId:%d, waitRetry %d", + seriousId, execId, pTask->seriousId, pTask->execId, pTask->failedSeriousId, pTask->failedExecId, pTask->waitRetry); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -185,6 +189,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } pTask->failedExecId = pTask->execId; + pTask->failedSeriousId = pTask->seriousId; int8_t jobStatus = 0; if (schJobNeedToStop(pJob, &jobStatus)) { @@ -1109,7 +1114,10 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { int32_t code = 0; if (NULL == pTask->msg) { // TODO add more detailed reason for failure + SCH_LOCK(SCH_WRITE, &pTask->planLock); code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen); + SCH_UNLOCK(SCH_WRITE, &pTask->planLock); + if (TSDB_CODE_SUCCESS != code) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); @@ -1190,14 +1198,21 @@ int32_t schLaunchTaskImpl(void *param) { SCH_LOCK_TASK(pTask); } - int8_t status = 0; - int32_t code = 0; - - (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; pTask->retryTimes++; pTask->waitRetry = false; + int8_t status = 0; + int32_t code = 0; + + if (atomic_load_64(&pTask->seriousId) < atomic_load_64(&pJob->seriousId)) { + SCH_TASK_DLOG("task seriousId:0x%" PRIx64 " is smaller than job seriousId:0x%" PRIx64 ", skip launch", + pTask->seriousId, pJob->seriousId); + goto _return; + } + + (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); + SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes); @@ -1352,6 +1367,8 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); pTask->seriousId = pJob->seriousId; + + SCH_TASK_DLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId); SCH_ERR_RET(schDelayLaunchTask(pJob, pTask)); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 6e13e37e88..a9878ec9a9 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -54,7 +54,7 @@ namespace { -extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, +extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t sId, int32_t execId, SDataBuf *pMsg, int32_t rspCode); extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode); @@ -591,7 +591,7 @@ void *schtSendRsp(void *param) { msg.msgType = TDMT_VND_SUBMIT_RSP; msg.pData = rmsg; - (void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + (void)schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -621,7 +621,7 @@ void *schtCreateFetchRspThread(void *param) { msg.msgType = TDMT_SCH_MERGE_FETCH_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0); + code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->seriousId, pJob->fetchTask->execId, &msg, 0); (void)schReleaseJob(job); @@ -925,7 +925,7 @@ TEST(queryTest, normalCase) { msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -941,7 +941,7 @@ TEST(queryTest, normalCase) { msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); ASSERT_EQ(code, 0); } @@ -1040,7 +1040,7 @@ TEST(queryTest, readyFirstCase) { msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -1057,7 +1057,7 @@ TEST(queryTest, readyFirstCase) { msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); ASSERT_EQ(code, 0); } @@ -1163,7 +1163,7 @@ TEST(queryTest, flowCtrlCase) { msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); ASSERT_EQ(code, 0); } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index b6c3b53cd8..526f1a33e4 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -819,7 +819,7 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) { } if (0 == line[0]) { - return TSDB_CODE_SYSTEM_ERROR; + return TSDB_CODE_UNSUPPORT_OS; } char tmp[32]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index df9482089d..26948325bd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -116,8 +116,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_BUFFER, "Out of buffer") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param") -TAOS_DEFINE_ERROR(TSDB_CODE_SYSTEM_ERROR, "Operating system error") TAOS_DEFINE_ERROR(TSDB_CODE_INTERNAL_ERROR, "Internal error") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_ERROR, "Internal error in time") @@ -517,7 +515,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_RANGE_ERROR, "Wrong filter range") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_FILTER_INVALID_TYPE, "Invalid filter type") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NO_FETCH_TIMEOUT, "Timeout for long time no fetch") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_SUCC_TO_PARTSUSS, "Change task status from success to partial success") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 9eb37a6eb1..49e5dccc50 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -37,17 +37,17 @@ SMPStrategyFp gMPFps[] = { int32_t mpCheckCfg(SMemPoolCfg* cfg) { if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { uError("invalid memory pool chunkSize:%d", cfg->chunkSize); - return TSDB_CODE_INVALID_MEM_POOL_PARAM; + return TSDB_CODE_INVALID_PARA; } if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) { uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy); - return TSDB_CODE_INVALID_MEM_POOL_PARAM; + return TSDB_CODE_INVALID_PARA; } if (cfg->threadNum <= 0) { uError("invalid memory pool threadNum:%d", cfg->threadNum); - return TSDB_CODE_INVALID_MEM_POOL_PARAM; + return TSDB_CODE_INVALID_PARA; } return TSDB_CODE_SUCCESS; @@ -1294,7 +1294,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1326,7 +1326,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, num, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1357,7 +1357,7 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1409,7 +1409,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* if (NULL == poolHandle || NULL == fileName || NULL == ptr) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", __FUNCTION__, poolHandle, session, fileName, ptr); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1446,7 +1446,7 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64 if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, ptr, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1506,7 +1506,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha if (NULL == poolHandle || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", __FUNCTION__, poolHandle, session, fileName); - MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_PARA); } if (NULL == ptr) { @@ -1530,7 +1530,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, alignment, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1589,7 +1589,7 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d", __FUNCTION__, poolHandle, session, fileName, size); - MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_PARA); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1676,7 +1676,7 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t int32_t code = TSDB_CODE_SUCCESS; if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) { uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize); - MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_PARA); } SMPSession* pSession = (SMPSession*)session; diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 38f496a931..d0c08b1318 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -738,7 +738,7 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; if (availSize < MPT_MIN_MEM_POOL_SIZE) { uError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize); - return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; + //return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; } uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize);