diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4d5e18520c..69a11638b1 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 883c5f7b99..5b4224d575 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting); bool qTaskIsExecuting(qTaskInfo_t qinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a4d89dcdcc..cc12566389 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,6 +58,7 @@ extern "C" { #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) +#define STREAM_EXEC_T_DROP_ONE_TASK (-8) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -752,15 +753,20 @@ void streamMetaCleanup(); int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); + +int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store +int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey); + int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); +int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); + int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); + void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..6899140af9 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { } case TDMT_STREAM_TASK_DROP: - return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7d83dbcf84..1f755f816e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1302,7 +1302,7 @@ _checkpoint: } streamMetaWLock(pMeta); - if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) { streamMetaWUnLock(pMeta); taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..bd2a7207f2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen); } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 77632d328e..bfcbbf6c74 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -260,13 +260,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // stream do update the nodeEp info, write it into stream meta. if (updated) { tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); - code = streamMetaSaveTask(pMeta, pTask); + code = streamMetaSaveTaskInMeta(pMeta, pTask); if (code) { tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code)); } if (pHTask != NULL) { - code = streamMetaSaveTask(pMeta, pHTask); + code = streamMetaSaveTaskInMeta(pMeta, pHTask); if (code) { tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code)); } @@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -720,29 +720,40 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); - code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); if (code) { - tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, + tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId, + vgId, (int32_t)hTaskId.taskId, tstrerror(code)); + } else { + tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId, (int32_t)hTaskId.taskId); } } // drop the stream task now - code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK); if (code) { - tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); + tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code)); + } else { + tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId); } +// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); +// if (code) { +// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); +// } + // commit the update - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); +// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); +// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } streamMetaWUnLock(pMeta); + + tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId); return 0; // always return success } @@ -857,6 +868,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; + } else if (type == STREAM_EXEC_T_DROP_ONE_TASK) { + code = streamMetaDropTask(pMeta, req.streamId, req.taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1386b0b82f..3d60e4d797 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -972,20 +972,28 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { + int64_t st = taosGetTimestampMs(); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); + qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); while (1) { taosWLockLatch(&pTaskInfo->lock); if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again taosWUnLockLatch(&pTaskInfo->lock); - taosMsleep(100); + + taosMsleep(200); + + int64_t d = taosGetTimestampMs() - st; + if (d >= waitingDuration && waitingDuration >= 0) { + qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); + return TSDB_CODE_SUCCESS; + } } else { // not running now pTaskInfo->code = rspCode; taosWUnLockLatch(&pTaskInfo->lock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a49e50547..d62165a72c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -625,14 +625,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", - id, vgId, pReq->taskId, numOfTasks); - } - - if (pReq->dropRelHTask) { - code = streamMetaCommit(pMeta); + code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); + if (code) { + stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); + } } } @@ -697,7 +693,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pTask->status.taskStatus = TASK_STATUS__READY; - code = streamMetaSaveTask(pMeta, pTask); + code = streamMetaSaveTaskInMeta(pMeta, pTask); streamMutexUnlock(&pTask->lock); if (code != TSDB_CODE_SUCCESS) { @@ -708,10 +704,12 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, - (int32_t)pReq->hTaskId, numOfTasks); + code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); + if (code) { + stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); + } else { + stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId); + } } code = streamMetaCommit(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ddd0201460..ce1c228b7b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -632,7 +632,7 @@ void streamMetaCloseImpl(void* arg) { } // todo let's check the status for each task -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { +int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t vgId = pTask->pMeta->vgId; void* buf = NULL; int32_t len; @@ -682,7 +682,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return code; } -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { +int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) { int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { @@ -705,7 +705,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p != NULL) { - stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId); + stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId); tFreeStreamTask(pTask); return code; } @@ -735,7 +735,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return code; } - if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) { int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); void* pUnused = taosArrayPop(pMeta->pTaskList); @@ -885,6 +885,8 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { int32_t code = 0; + int32_t waitingDuration = 5000; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); if (code) { @@ -895,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { // let's kill the query procedure within stream, to end it ASAP. if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -932,7 +934,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - code = streamMetaRemoveTask(pMeta, &id); + code = streamMetaRemoveTaskInMeta(pMeta, &id); if (code) { stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code)); } @@ -963,6 +965,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } +int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + SStreamTask* pTask = NULL; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = 0; + + streamMetaWLock(pMeta); + + code = streamMetaUnregisterTask(pMeta, streamId, taskId); + numOfTasks = streamMetaGetNumOfTasks(pMeta); + if (code) { + stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); + } + + code = streamMetaCommit(pMeta); + if (code) { + stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); + } else { + stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); + } + + streamMetaWUnLock(pMeta); + + return code; +} + int32_t streamMetaBegin(SStreamMeta* pMeta) { streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, @@ -1185,7 +1213,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { STaskId* pId = taosArrayGet(pRecycleList, i); - code = streamMetaRemoveTask(pMeta, pId); + code = streamMetaRemoveTaskInMeta(pMeta, pId); if (code) { stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code)); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d27ed520c6..f0b288f096 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -703,7 +703,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { } if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code)); } @@ -862,7 +862,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { pStreamTask->status.taskStatus = TASK_STATUS__READY; } - code = streamMetaSaveTask(pMeta, pStreamTask); + code = streamMetaSaveTaskInMeta(pMeta, pStreamTask); streamMutexUnlock(&(pStreamTask->lock)); streamMetaReleaseTask(pMeta, pStreamTask); @@ -1025,7 +1025,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { // in case of fill-history task, stop the tsdb file scan operation. if (pTask->info.fillHistory == 1) { void* pExecutor = pTask->exec.pExecutor; - code = qKillTask(pExecutor, TSDB_CODE_SUCCESS); + code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000); } stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); @@ -1287,6 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; + case STREAM_EXEC_T_DROP_ONE_TASK: + return "drop-one-task"; case 0: return "exec-all-tasks"; default: