[td-11818] free still reachable buffer.

This commit is contained in:
Haojun Liao 2022-01-06 10:55:20 +08:00
parent 7916c1e622
commit b00bc84157
4 changed files with 93 additions and 58 deletions

View File

@ -203,12 +203,21 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag)
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res); int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
if (code != TSDB_CODE_SUCCESS) {
// handle error and retry
} else {
if (*pJob != NULL) {
scheduleFreeJob(*pJob);
}
}
pRequest->affectedRows = res.numOfRows; pRequest->affectedRows = res.numOfRows;
return res.code; return res.code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob);
} }
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {

View File

@ -505,15 +505,15 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
// for(int32_t i = 0; i < 10000; ++i) { for(int32_t i = 0; i < 1000; ++i) {
// char sql[512] = {0}; char sql[512] = {0};
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
// TAOS_RES* pres = taos_query(pConn, sql); TAOS_RES* pres = taos_query(pConn, sql);
// if (taos_errno(pres) != 0) { if (taos_errno(pres) != 0) {
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
// } }
// taos_free_result(pres); taos_free_result(pres);
// } }
taos_close(pConn); taos_close(pConn);
} }
@ -521,11 +521,11 @@ TEST(testCase, create_multiple_tables) {
TEST(testCase, generated_request_id_test) { TEST(testCase, generated_request_id_test) {
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for(int32_t i = 0; i < 50000000; ++i) { for(int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId(); uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v)); void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) { if (result != nullptr) {
printf("0x%"PRIx64", index:%d\n", v, i); printf("0x%lx, index:%d\n", v, i);
} }
assert(result == nullptr); assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0); taosHashPut(phash, &v, sizeof(v), NULL, 0);

View File

@ -82,12 +82,11 @@ typedef struct SSchJobAttr {
} SSchJobAttr; } SSchJobAttr;
typedef struct SSchJob { typedef struct SSchJob {
uint64_t queryId; uint64_t queryId;
int32_t levelNum; int32_t levelNum;
int32_t levelIdx; int32_t levelIdx;
int8_t status; int8_t status;
SSchJobAttr attr; SSchJobAttr attr;
SQueryProfileSummary summary;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SEpAddr resEp; SEpAddr resEp;
void *transport; void *transport;
@ -95,18 +94,20 @@ typedef struct SSchJob {
tsem_t rspSem; tsem_t rspSem;
int32_t userFetch; int32_t userFetch;
int32_t remoteFetch; int32_t remoteFetch;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
void *res; void *res;
int32_t resNumOfRows; int32_t resNumOfRows;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SArray *levels; // Element is SQueryLevel, starting from 0. SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
SQueryProfileSummary summary;
} SSchJob; } SSchJob;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE

View File

@ -18,8 +18,7 @@
#include "query.h" #include "query.h"
#include "catalog.h" #include "catalog.h"
SSchedulerMgmt schMgmt = {0}; static SSchedulerMgmt schMgmt = {0};
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) { for (int32_t i = 0; i < job->levelNum; ++i) {
@ -93,11 +92,30 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
SSchTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) {
pJob->attr.needFetch = false;
} else {
pJob->attr.queryJob = true;
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { task.plan = plan;
task.level = pLevel;
task.status = JOB_TASK_STATUS_NOT_START;
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
return task;
}
static void cleanupTask(SSchTask* pTask) {
taosArrayDestroy(pTask->condidateAddrs);
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
job->queryId = dag->queryId; pJob->queryId = dag->queryId;
if (dag->numOfSubplans <= 0) { if (dag->numOfSubplans <= 0) {
qError("invalid subplan num:%d", dag->numOfSubplans); qError("invalid subplan num:%d", dag->numOfSubplans);
@ -115,20 +133,20 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == job->levels) { if (NULL == pJob->levels) {
qError("taosArrayInit %d failed", levelNum); qError("taosArrayInit %d failed", levelNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
//?? //??
job->attr.needFetch = true; pJob->attr.needFetch = true;
job->levelNum = levelNum;
job->levelIdx = levelNum - 1;
job->subPlans = dag->pSubplans; pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
pJob->subPlans = dag->pSubplans;
SSchLevel level = {0}; SSchLevel level = {0};
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
@ -138,12 +156,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
level.status = JOB_TASK_STATUS_NOT_START; level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
if (NULL == taosArrayPush(job->levels, &level)) { if (NULL == taosArrayPush(pJob->levels, &level)) {
qError("taosArrayPush failed"); qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
pLevel = taosArrayGet(job->levels, i); pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i; pLevel->level = i;
levelPlans = taosArrayGetP(dag->pSubplans, i); levelPlans = taosArrayGetP(dag->pSubplans, i);
@ -168,20 +186,13 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
for (int32_t n = 0; n < levelPlanNum; ++n) { for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGetP(levelPlans, n); SSubplan *plan = taosArrayGetP(levelPlans, n);
SSchTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) { if (plan->type == QUERY_TYPE_MODIFY) {
job->attr.needFetch = false; pJob->attr.needFetch = false;
} else { } else {
job->attr.queryJob = true; pJob->attr.queryJob = true;
} }
SSchTask task = initTask(pJob, plan, pLevel);
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan;
task.level = pLevel;
task.status = JOB_TASK_STATUS_NOT_START;
void *p = taosArrayPush(pLevel->subTasks, &task); void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) { if (NULL == p) {
qError("taosArrayPush failed"); qError("taosArrayPush failed");
@ -193,10 +204,9 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
} }
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
if (planToTask) { if (planToTask) {
taosHashCleanup(planToTask); taosHashCleanup(planToTask);
@ -839,7 +849,6 @@ void schDropJobAllTasks(SSchJob *job) {
void *pIter = taosHashIterate(job->succTasks, NULL); void *pIter = taosHashIterate(job->succTasks, NULL);
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter); pIter = taosHashIterate(job->succTasks, pIter);
@ -933,16 +942,15 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag,
code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
if (0 != code) { if (0 != code) {
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId); qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} else { } else {
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
} }
job->status = JOB_TASK_STATUS_NOT_START; job->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job)); SCH_ERR_JRET(schLaunchJob(job));
*(SSchJob **)pJob = job; *(SSchJob **)pJob = job;
@ -954,7 +962,6 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
*(SSchJob **)pJob = NULL; *(SSchJob **)pJob = NULL;
scheduleFreeJob(job); scheduleFreeJob(job);
@ -1064,7 +1071,25 @@ void scheduleFreeJob(void *pJob) {
schDropJobAllTasks(job); schDropJobAllTasks(job);
} }
//TODO free job taosArrayDestroy(job->subPlans);
int32_t numOfLevels = taosArrayGetSize(job->levels);
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(job->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
cleanupTask(pTask);
}
taosArrayDestroy(pLevel->subTasks);
}
taosHashCleanup(job->execTasks);
taosHashCleanup(job->failTasks);
taosHashCleanup(job->succTasks);
taosArrayDestroy(job->levels);
tfree(job);
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {