From b00bc84157c7ea004cd837d4ac5c9f6556fd33c6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 Jan 2022 10:55:20 +0800 Subject: [PATCH] [td-11818] free still reachable buffer. --- source/client/src/clientImpl.c | 11 ++- source/client/test/clientTests.cpp | 22 +++--- source/libs/scheduler/inc/schedulerInt.h | 29 ++++---- source/libs/scheduler/src/scheduler.c | 89 +++++++++++++++--------- 4 files changed, 93 insertions(+), 58 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a416033946..9f91fa8a23 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -203,12 +203,21 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res); + if (code != TSDB_CODE_SUCCESS) { + // handle error and retry + } else { + if (*pJob != NULL) { + scheduleFreeJob(*pJob); + } + } + pRequest->affectedRows = res.numOfRows; return res.code; } - return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob); } TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ef2dcc21df..cbd5689cc8 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -505,15 +505,15 @@ TEST(testCase, create_multiple_tables) { taos_free_result(pRes); -// for(int32_t i = 0; i < 10000; ++i) { -// char sql[512] = {0}; -// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); -// TAOS_RES* pres = taos_query(pConn, sql); -// if (taos_errno(pres) != 0) { -// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); -// } -// taos_free_result(pres); -// } + for(int32_t i = 0; i < 1000; ++i) { + char sql[512] = {0}; + snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); + TAOS_RES* pres = taos_query(pConn, sql); + if (taos_errno(pres) != 0) { + printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + } + taos_free_result(pres); + } taos_close(pConn); } @@ -521,11 +521,11 @@ TEST(testCase, create_multiple_tables) { TEST(testCase, generated_request_id_test) { SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - for(int32_t i = 0; i < 50000000; ++i) { + for(int32_t i = 0; i < 50000; ++i) { uint64_t v = generateRequestId(); void* result = taosHashGet(phash, &v, sizeof(v)); if (result != nullptr) { - printf("0x%"PRIx64", index:%d\n", v, i); + printf("0x%lx, index:%d\n", v, i); } assert(result == nullptr); taosHashPut(phash, &v, sizeof(v), NULL, 0); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index fa4ae0d152..be488ea87b 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -82,12 +82,11 @@ typedef struct SSchJobAttr { } SSchJobAttr; typedef struct SSchJob { - uint64_t queryId; - int32_t levelNum; - int32_t levelIdx; - int8_t status; - SSchJobAttr attr; - SQueryProfileSummary summary; + uint64_t queryId; + int32_t levelNum; + int32_t levelIdx; + int8_t status; + SSchJobAttr attr; SEpSet dataSrcEps; SEpAddr resEp; void *transport; @@ -95,18 +94,20 @@ typedef struct SSchJob { tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; - SSchTask *fetchTask; + int32_t errCode; void *res; int32_t resNumOfRows; - - SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* - SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* - SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* - - SArray *levels; // Element is SQueryLevel, starting from 0. - SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. + + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* + SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* + SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. + + SQueryProfileSummary summary; } SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9079912c40..8271d865a1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -18,8 +18,7 @@ #include "query.h" #include "catalog.h" -SSchedulerMgmt schMgmt = {0}; - +static SSchedulerMgmt schMgmt = {0}; int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { @@ -93,11 +92,30 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { return TSDB_CODE_SUCCESS; } +static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { + SSchTask task = {0}; + if (plan->type == QUERY_TYPE_MODIFY) { + pJob->attr.needFetch = false; + } else { + pJob->attr.queryJob = true; + } -int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { + task.plan = plan; + task.level = pLevel; + task.status = JOB_TASK_STATUS_NOT_START; + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + + return task; +} + +static void cleanupTask(SSchTask* pTask) { + taosArrayDestroy(pTask->condidateAddrs); +} + +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { int32_t code = 0; - job->queryId = dag->queryId; + pJob->queryId = dag->queryId; if (dag->numOfSubplans <= 0) { qError("invalid subplan num:%d", dag->numOfSubplans); @@ -115,20 +133,20 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - - job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); - if (NULL == job->levels) { + + pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); + if (NULL == pJob->levels) { qError("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } //?? - job->attr.needFetch = true; - - job->levelNum = levelNum; - job->levelIdx = levelNum - 1; + pJob->attr.needFetch = true; - job->subPlans = dag->pSubplans; + pJob->levelNum = levelNum; + pJob->levelIdx = levelNum - 1; + + pJob->subPlans = dag->pSubplans; SSchLevel level = {0}; SArray *levelPlans = NULL; @@ -138,12 +156,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - if (NULL == taosArrayPush(job->levels, &level)) { + if (NULL == taosArrayPush(pJob->levels, &level)) { qError("taosArrayPush failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pLevel = taosArrayGet(job->levels, i); + pLevel = taosArrayGet(pJob->levels, i); pLevel->level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); @@ -168,20 +186,13 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGetP(levelPlans, n); - SSchTask task = {0}; - if (plan->type == QUERY_TYPE_MODIFY) { - job->attr.needFetch = false; + pJob->attr.needFetch = false; } else { - job->attr.queryJob = true; + pJob->attr.queryJob = true; } - - task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - task.plan = plan; - task.level = pLevel; - task.status = JOB_TASK_STATUS_NOT_START; - + SSchTask task = initTask(pJob, plan, pLevel); void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); @@ -193,10 +204,9 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } - } - SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); + SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); if (planToTask) { taosHashCleanup(planToTask); @@ -839,7 +849,6 @@ void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; - schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); pIter = taosHashIterate(job->succTasks, pIter); @@ -933,16 +942,15 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId); + qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } else { - qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } } job->status = JOB_TASK_STATUS_NOT_START; - SCH_ERR_JRET(schLaunchJob(job)); *(SSchJob **)pJob = job; @@ -954,7 +962,6 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, return TSDB_CODE_SUCCESS; _return: - *(SSchJob **)pJob = NULL; scheduleFreeJob(job); @@ -1064,7 +1071,25 @@ void scheduleFreeJob(void *pJob) { schDropJobAllTasks(job); } - //TODO free job + taosArrayDestroy(job->subPlans); + int32_t numOfLevels = taosArrayGetSize(job->levels); + for(int32_t i = 0; i < numOfLevels; ++i) { + SSchLevel *pLevel = taosArrayGet(job->levels, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); + cleanupTask(pTask); + } + + taosArrayDestroy(pLevel->subTasks); + } + + taosHashCleanup(job->execTasks); + taosHashCleanup(job->failTasks); + taosHashCleanup(job->succTasks); + taosArrayDestroy(job->levels); + tfree(job); } void schedulerDestroy(void) {