diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 6ddd906700..2be0561ce7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -94,6 +94,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); +void qWorkerStopAllTasks(void *qWorkerMgmt); + void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a03dc7d9f9..636decc60b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -385,6 +385,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x072D) #define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x072E) #define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F) +#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 29af2bda67..83f375c986 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -77,6 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryPreClose(SVnode *pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 8c2036b97b..77d375bc45 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -242,7 +242,10 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } +void vnodePreClose(SVnode *pVnode) { + vnodeQueryPreClose(pVnode); + vnodeSyncPreClose(pVnode); +} void vnodeClose(SVnode *pVnode) { if (pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index efedc12d80..ef0ee6ac0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -28,6 +28,8 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } +void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); } + void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e1db1f4729..b0da277cfb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -153,6 +153,16 @@ typedef struct { SSchemaWrapper* qsw; } SSchemaInfo; +typedef struct { + int32_t operatorType; + int64_t refId; +} SExchangeOpStopInfo; + +typedef struct { + SRWLatch lock; + SArray* pStopInfo; +} STaskStopInfo; + typedef struct SExecTaskInfo { STaskIdInfo id; uint32_t status; @@ -171,6 +181,7 @@ typedef struct SExecTaskInfo { SSubplan* pSubplan; struct SOperatorInfo* pRoot; SLocalFetch localFetch; + STaskStopInfo stopInfo; } SExecTaskInfo; enum { @@ -1050,6 +1061,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 049de727df..c57a1b38eb 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -65,6 +65,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); @@ -286,6 +289,9 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); + SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; + qAppendTaskStopInfo(pTaskInfo, &stopInfo); + pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; @@ -543,6 +549,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->cost.openCost = taosGetTimestampUs() - startTs; tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } @@ -562,6 +572,9 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1aa9a3c613..428af19a6c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -659,6 +659,54 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { return pTaskInfo->code; } +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo); + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return TSDB_CODE_SUCCESS; +} + +int32_t stopInfoComp(void const* lp, void const* rp) { + SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp; + SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp; + + if (key->refId < pInfo->refId) { + return -1; + } else if (key->refId > pInfo->refId) { + return 1; + } + + return 0; +} + +void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ); + if (idx >= 0) { + taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx); + } + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return; +} + +void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + + int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo); + for (int32_t i = 0; i < num; ++i) { + SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); + SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); + if (pExchangeInfo) { + tsem_post(&pExchangeInfo->ready); + taosReleaseRef(exchangeObjRefPool, pStop->refId); + } + } + + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); +} + int32_t qAsyncKillTask(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; @@ -667,7 +715,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { } qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo); + + qStopTaskOperators(pTaskInfo); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7fd288cd57..baa5cb6479 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2597,6 +2597,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; pTaskInfo->pTableInfoList = tableListCreate(); + pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3210,6 +3211,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } + taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 36c6817595..a9eca64675 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -246,7 +246,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -254,7 +254,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -262,7 +262,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = c; \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 1871316260..4c4a41df82 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -91,11 +91,53 @@ _return: void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { QW_LOCK(QW_READ, &sch->tasksLock); - QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, - taosHashGetSize(sch->tasksHash)); + int32_t taskNum = taosHashGetSize(sch->tasksHash); + QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); + + uint64_t qId, tId; + int32_t eId; + SQWTaskStatus *pTask = NULL; + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + pTask = (SQWTaskStatus *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status); + + pIter = taosHashIterate(sch->tasksHash, pIter); + } + QW_UNLOCK(QW_READ, &sch->tasksLock); } +void qwDbgDumpTasksInfo(SQWorker *mgmt) { + QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + + int32_t i = 0; + SQWTaskCtx *ctx = NULL; + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, " + "execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " + "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", + ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, + ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, + ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, + ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], + ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } + +} + void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; @@ -120,7 +162,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + qwDbgDumpTasksInfo(mgmt); } int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index e9ded9b269..e13791ae89 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,9 +281,11 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t code = 0; + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { + qDebug("start to kill task"); code = qAsyncKillTask(taskHandle); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a7cd3db824..e45beb7e13 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -683,6 +683,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool queryStop = false; do { + ctx = NULL; + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1162,6 +1164,41 @@ _return: QW_RET(code); } +void qWorkerStopAllTasks(void *qWorkerMgmt) { + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + + QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); + + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_LOCK(QW_WRITE, &ctx->lock); + + QW_TASK_DLOG_E("start to force stop task"); + + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task already dropping"); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + continue; + } + + if (QW_QUERY_RUNNING(ctx)) { + qwKillTaskHandle(ctx); + } + + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } +} + void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a1162d2e94..b406432616 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in i TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")