diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 6ddd906700..2be0561ce7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -94,6 +94,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); +void qWorkerStopAllTasks(void *qWorkerMgmt); + void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a03dc7d9f9..636decc60b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -385,6 +385,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x072D) #define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x072E) #define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F) +#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 9a4b5bd275..ef0ee6ac0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -28,7 +28,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } -void vnodeQueryPreClose(SVnode *pVnode) { qworkerStopAllTasks((void *)pVnode->pQuery); } +void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 20d4a5dbae..07a63f9f83 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1058,6 +1058,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 883d5d6544..9b1c0caf36 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -688,7 +688,7 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { } taosWUnLockLatch(&pTaskInfo->stopInfo.lock); - return TSDB_CODE_SUCCESS; + return; } void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 36c6817595..a9eca64675 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -246,7 +246,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -254,7 +254,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -262,7 +262,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = c; \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 911a7a4594..e45beb7e13 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -683,6 +683,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool queryStop = false; do { + ctx = NULL; + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1162,11 +1164,13 @@ _return: QW_RET(code); } -void qworkerStopAllTasks(void *qWorkerMgmt) { +void qWorkerStopAllTasks(void *qWorkerMgmt) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); - + + uint64_t qId, tId; + int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; @@ -1187,8 +1191,6 @@ void qworkerStopAllTasks(void *qWorkerMgmt) { if (QW_QUERY_RUNNING(ctx)) { qwKillTaskHandle(ctx); - } else { - qwDropTask(QW_FPARAMS()); } QW_UNLOCK(QW_WRITE, &ctx->lock); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0e6568d692..c032664ab1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in i TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")