From a657413fa6f30f2f6c862a98a4ca4de1af854d8e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 1 Jul 2022 11:10:35 +0800 Subject: [PATCH] enh: kill query --- source/client/src/clientEnv.c | 19 ++++++- source/libs/scheduler/src/schJob.c | 6 +- tests/script/api/stopquery.c | 91 ++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index a234311569..fefabf6539 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -153,7 +153,13 @@ void destroyAppInst(SAppInstInfo *pAppInfo) { } void destroyTscObj(void *pObj) { + if (NULL == pObj) { + return; + } + STscObj *pTscObj = pObj; + int64_t tscId = pTscObj->id; + tscDebug("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); @@ -168,6 +174,8 @@ void destroyTscObj(void *pObj) { } taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); + + tscDebug("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { @@ -261,9 +269,14 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } void doDestroyRequest(void *p) { - assert(p != NULL); + if (NULL == p) { + return; + } + SRequestObj *pRequest = (SRequestObj *)p; - + int64_t reqId = pRequest->self; + tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); if (pRequest->body.queryJob != 0) { @@ -285,6 +298,8 @@ void doDestroyRequest(void *p) { deregisterRequest(pRequest); } taosMemoryFreeClear(pRequest); + + tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); } void destroyRequest(SRequestObj *pRequest) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 425ea242fd..5d5dc745d0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1495,6 +1495,8 @@ void schFreeJobImpl(void *job) { uint64_t queryId = pJob->queryId; int64_t refId = pJob->refId; + qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { schCancelJob(pJob); } @@ -1535,12 +1537,12 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob->resData); taosMemoryFree(pJob); - qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); - int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); if (jobNum == 0) { schCloseJobRef(); } + + qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); } int32_t schLaunchStaticExplainJob(SSchedulerReq *pReq, SSchJob *pJob, bool sync) { diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index 4c7964c983..72fe4c406a 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -156,6 +156,28 @@ void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) { } } +void sqKillFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + taos_kill_query(qParam->taos); + + *qParam->end = 1; +} + +void sqKillQueryCb(void *param, TAOS_RES *pRes, int code) { + SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; + if (code == 0 && pRes) { + if (qParam->fetch) { + taos_fetch_rows_a(pRes, sqKillFetchCb, param); + } else { + taos_kill_query(qParam->taos); + *qParam->end = 1; + } + } else { + sqExit("select", taos_errstr(pRes)); + } +} + + int sqSyncStopQuery(bool fetch) { CASE_ENTER(); for (int32_t i = 0; i < runTimes; ++i) { @@ -391,6 +413,69 @@ int sqConSyncCloseQuery(bool fetch) { CASE_LEAVE(); } +int sqSyncKillQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + TAOS_RES* pRes = taos_query(taos, sql); + code = taos_errno(pRes); + if (code) { + sqExit("taos_query", taos_errstr(pRes)); + } + + if (fetch) { + taos_fetch_row(pRes); + } + + taos_kill_query(taos); + + taos_close(taos); + } + CASE_LEAVE(); +} + +int sqAsyncKillQuery(bool fetch) { + CASE_ENTER(); + for (int32_t i = 0; i < runTimes; ++i) { + char sql[1024] = {0}; + int32_t code = 0; + TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0); + if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL)); + + sprintf(sql, "reset query cache"); + sqExecSQL(taos, sql); + + sprintf(sql, "use %s", dbName); + sqExecSQL(taos, sql); + + sprintf(sql, "select * from %s", tbName); + + int32_t qEnd = 0; + SSP_CB_PARAM param = {0}; + param.fetch = fetch; + param.end = &qEnd; + taos_query_a(taos, sql, sqKillQueryCb, ¶m); + while (0 == qEnd) { + usleep(5000); + } + + taos_close(taos); + } + CASE_LEAVE(); +} + + void sqRunAllCase(void) { /* sqSyncStopQuery(false); @@ -409,8 +494,14 @@ void sqRunAllCase(void) { sqAsyncCloseQuery(true); */ sqConSyncCloseQuery(false); +/* sqConSyncCloseQuery(true); + sqSyncKillQuery(false); + sqSyncKillQuery(true); + sqAsyncKillQuery(false); + sqAsyncKillQuery(true); +*/ }