diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e76422ee34..19a6407b77 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -149,7 +149,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ - int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); @@ -162,6 +161,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @return */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); bool qTaskIsExecuting(qTaskInfo_t qinfo); @@ -171,14 +171,6 @@ bool qTaskIsExecuting(qTaskInfo_t qinfo); */ void qDestroyTask(qTaskInfo_t tinfo); -/** - * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. - * - * @param iter the table iterator to traverse all tables belongs to a super table, or an invert index - * @return - */ -int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); - void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5210a8cc66..ccd8b885a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -888,7 +888,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if (pTaskInfo != NULL) { -// qAsyncKillTask(pTaskInfo); + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); } taosWLockLatch(&pTq->lock); @@ -904,11 +904,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg qStreamCloseTsdbReader(pTaskInfo); } - // reset the error code. - if (pHandle->execHandle.task != NULL) { - - } - taosWUnLockLatch(&pTq->lock); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { taosMemoryFree(req.qmsg); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 86fcd319c5..1ea2101aff 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -749,6 +749,23 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (pTaskInfo == NULL) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo, rspCode); + + while(qTaskIsExecuting(pTaskInfo)) { + taosMsleep(10); + } + + pTaskInfo->code = rspCode; + return TSDB_CODE_SUCCESS; +} + bool qTaskIsExecuting(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (NULL == pTaskInfo) {