diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8f8691cfc2..737240d314 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -77,6 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryPreClose(SVnode *pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 8c2036b97b..77d375bc45 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -242,7 +242,10 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } +void vnodePreClose(SVnode *pVnode) { + vnodeQueryPreClose(pVnode); + vnodeSyncPreClose(pVnode); +} void vnodeClose(SVnode *pVnode) { if (pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index efedc12d80..9a4b5bd275 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -28,6 +28,8 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } +void vnodeQueryPreClose(SVnode *pVnode) { qworkerStopAllTasks((void *)pVnode->pQuery); } + void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ced668cf37..20d4a5dbae 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -153,6 +153,16 @@ typedef struct { SSchemaWrapper* qsw; } SSchemaInfo; +typedef struct { + int32_t operatorType; + int64_t refId; +} SExchangeOpStopInfo; + +typedef struct { + SRWLatch lock; + SArray* pStopInfo; +} STaskStopInfo; + typedef struct SExecTaskInfo { STaskIdInfo id; uint32_t status; @@ -171,6 +181,7 @@ typedef struct SExecTaskInfo { SSubplan* pSubplan; struct SOperatorInfo* pRoot; SLocalFetch localFetch; + STaskStopInfo stopInfo; } SExecTaskInfo; enum { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1cf01c8661..883d5d6544 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -659,6 +659,54 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { return pTaskInfo->code; } +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo); + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return TSDB_CODE_SUCCESS; +} + +int32_t stopInfoComp(void const* lp, void const* rp) { + SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp; + SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp; + + if (key->refId < pInfo->refId) { + return -1; + } else if (key->refId > pInfo->refId) { + return 1; + } + + return 0; +} + +void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ); + if (idx >= 0) { + taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx); + } + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return TSDB_CODE_SUCCESS; +} + +void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + + int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo); + for (int32_t i = 0; i < num; ++i) { + SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); + SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); + if (pExchangeInfo) { + tsem_post(&pExchangeInfo->ready); + taosReleaseRef(exchangeObjRefPool, pStop->refId); + } + } + + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); +} + int32_t qAsyncKillTask(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; @@ -667,7 +715,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { } qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo); + + qStopTaskOperators(pTaskInfo); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7d662f8784..2dae6f2482 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1653,7 +1653,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t } typedef struct SFetchRspHandleWrapper { - uint32_t exchangeId; + int64_t exchangeId; int32_t sourceIndex; } SFetchRspHandleWrapper; @@ -1873,6 +1873,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); @@ -1983,6 +1986,10 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->cost.openCost = taosGetTimestampUs() - startTs; tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } @@ -2002,6 +2009,9 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); @@ -2217,6 +2227,9 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); + SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; + qAppendTaskStopInfo(pTaskInfo, &stopInfo); + pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; @@ -3219,6 +3232,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; pTaskInfo->pTableInfoList = tableListCreate(); + pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3832,6 +3846,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } + taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 1871316260..4c4a41df82 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -91,11 +91,53 @@ _return: void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { QW_LOCK(QW_READ, &sch->tasksLock); - QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, - taosHashGetSize(sch->tasksHash)); + int32_t taskNum = taosHashGetSize(sch->tasksHash); + QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); + + uint64_t qId, tId; + int32_t eId; + SQWTaskStatus *pTask = NULL; + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + pTask = (SQWTaskStatus *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status); + + pIter = taosHashIterate(sch->tasksHash, pIter); + } + QW_UNLOCK(QW_READ, &sch->tasksLock); } +void qwDbgDumpTasksInfo(SQWorker *mgmt) { + QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + + int32_t i = 0; + SQWTaskCtx *ctx = NULL; + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, " + "execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " + "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", + ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, + ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, + ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, + ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], + ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } + +} + void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; @@ -120,7 +162,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + qwDbgDumpTasksInfo(mgmt); } int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index e9ded9b269..e13791ae89 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,9 +281,11 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t code = 0; + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { + qDebug("start to kill task"); code = qAsyncKillTask(taskHandle); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a7cd3db824..911a7a4594 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1162,6 +1162,41 @@ _return: QW_RET(code); } +void qworkerStopAllTasks(void *qWorkerMgmt) { + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + + QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); + + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_LOCK(QW_WRITE, &ctx->lock); + + QW_TASK_DLOG_E("start to force stop task"); + + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task already dropping"); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + continue; + } + + if (QW_QUERY_RUNNING(ctx)) { + qwKillTaskHandle(ctx); + } else { + qwDropTask(QW_FPARAMS()); + } + + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } +} + void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return;