From c9d9c3ffd03a8ae2fc5668780d71ef7cd056d9f1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 28 Jun 2022 09:28:50 +0800 Subject: [PATCH] enh: stop query --- source/client/src/clientEnv.c | 5 ++++ source/client/src/clientHb.c | 5 ++++ source/libs/catalog/src/ctgAsync.c | 4 +-- source/libs/catalog/src/ctgCache.c | 2 +- source/libs/scheduler/inc/schedulerInt.h | 4 +-- source/libs/scheduler/src/schJob.c | 32 ++++++------------------ tests/script/api/stopquery.c | 5 ++-- 7 files changed, 23 insertions(+), 34 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index f5b5c48d47..56b675596f 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -132,9 +132,14 @@ void closeAllRequests(SHashObj *pRequests) { void destroyAppInst(SAppInstInfo* pAppInfo) { tscDebug("destroy app inst mgr %p", pAppInfo); + + taosThreadMutexLock(&appInfo.mutex); hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); + + taosThreadMutexUnlock(&appInfo.mutex); + taosMemoryFreeClear(pAppInfo->instKey); closeTransporter(pAppInfo); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 43e83dd501..2de630e181 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -275,8 +275,11 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) int32_t rspNum = taosArrayGetSize(pRsp.rsps); + taosThreadMutexLock(&appInfo.mutex); + SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { + taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); taosMemoryFreeClear(param); tFreeClientHbBatchRsp(&pRsp); @@ -300,6 +303,8 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) } } + taosThreadMutexUnlock(&appInfo.mutex); + tFreeClientHbBatchRsp(&pRsp); return code; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 683cbdb47c..455f2bd6a7 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -826,8 +826,8 @@ _return: pJob->jobResCode = code; - taosSsleep(2); - qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); + //taosSsleep(2); + //qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); taosAsyncExec(ctgCallUserCb, pJob, NULL); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 301cdbef20..77c1e5b8b1 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1083,7 +1083,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 843ce7d55a..a119795787 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -218,9 +218,7 @@ typedef struct SSchJob { int32_t levelIdx; SEpSet dataSrcEps; SHashObj *taskList; - SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* - SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* - SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* + SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* SHashObj *flowCtrl; // key is ep, element is SSchFlowControl SExplainCtx *explainCtx; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 7843482af3..89f355d78c 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -85,20 +85,6 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->succTasks = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->succTasks) { - SCH_JOB_ELOG("taosHashInit %d succTasks failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pJob->failTasks = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->failTasks) { - SCH_JOB_ELOG("taosHashInit %d failTasks failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - tsem_init(&pJob->rspSem, 0, 0); refId = taosAddRef(schMgmt.jobRef, pJob); @@ -724,6 +710,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +/* int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); @@ -801,6 +788,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { return TSDB_CODE_SUCCESS; } +*/ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { int8_t status = 0; @@ -1047,9 +1035,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); - if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { - SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); - } else { + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -1115,8 +1101,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_END_TS(pTask); - SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); @@ -1150,8 +1134,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; - SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - SCH_RET(schProcessOnJobPartialSuccess(pJob)); } @@ -1466,8 +1448,8 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { void schDropJobAllTasks(SSchJob *pJob) { schDropTaskInHashList(pJob, pJob->execTasks); - schDropTaskInHashList(pJob, pJob->succTasks); - schDropTaskInHashList(pJob, pJob->failTasks); +// schDropTaskInHashList(pJob, pJob->succTasks); +// schDropTaskInHashList(pJob, pJob->failTasks); } int32_t schCancelJob(SSchJob *pJob) { @@ -1507,8 +1489,8 @@ void schFreeJobImpl(void *job) { schFreeFlowCtrl(pJob); taosHashCleanup(pJob->execTasks); - taosHashCleanup(pJob->failTasks); - taosHashCleanup(pJob->succTasks); +// taosHashCleanup(pJob->failTasks); +// taosHashCleanup(pJob->succTasks); taosHashCleanup(pJob->taskList); taosArrayDestroy(pJob->levels); diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index addd1272cf..4c7964c983 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -36,7 +36,7 @@ int64_t st, et; char hostName[128]; char dbName[128]; char tbName[128]; -int32_t runTimes = 1; +int32_t runTimes = 10000; typedef struct { int id; @@ -367,8 +367,7 @@ void *closeThreadFp(void *arg) { SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; while (true) { if (qParam->taos) { - //usleep(rand() % 10000); - usleep(1000000); + usleep(rand() % 10000); taos_close(qParam->taos); break; }