refactor(stream): drop task in async ways
This commit is contained in:
parent
3eab0d7954
commit
fd96b096ea
|
@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
|||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||
bool isLeader, bool restored);
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen);
|
||||
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
|
||||
|
|
|
@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
|
|||
*/
|
||||
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting);
|
||||
|
||||
bool qTaskIsExecuting(qTaskInfo_t qinfo);
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ extern "C" {
|
|||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||
#define STREAM_EXEC_T_DROP_ONE_TASK (-8)
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
typedef struct SStreamQueue SStreamQueue;
|
||||
|
@ -752,15 +753,20 @@ void streamMetaCleanup();
|
|||
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn, int32_t vgId,
|
||||
int64_t stage, startComplete_fn_t fn, SStreamMeta** pMeta);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
||||
|
||||
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey);
|
||||
|
||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
|
||||
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
|
||||
void streamMetaClear(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
|
|
|
@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
}
|
||||
|
||||
case TDMT_STREAM_TASK_DROP:
|
||||
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
|
||||
return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen);
|
||||
case TDMT_VND_STREAM_TASK_UPDATE:
|
||||
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
|
||||
case TDMT_VND_STREAM_TASK_RESET:
|
||||
|
|
|
@ -1302,7 +1302,7 @@ _checkpoint:
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
|
||||
return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
|
|
|
@ -260,13 +260,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
// stream do update the nodeEp info, write it into stream meta.
|
||||
if (updated) {
|
||||
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
|
||||
code = streamMetaSaveTask(pMeta, pTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pTask);
|
||||
if (code) {
|
||||
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pHTask != NULL) {
|
||||
code = streamMetaSaveTask(pMeta, pHTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pHTask);
|
||||
if (code) {
|
||||
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
|
||||
}
|
||||
|
@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
@ -720,29 +720,40 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
|
||||
// drop the related fill-history task firstly
|
||||
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||
code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||
code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
||||
if (code) {
|
||||
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
|
||||
tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId,
|
||||
vgId, (int32_t)hTaskId.taskId, tstrerror(code));
|
||||
} else {
|
||||
tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId,
|
||||
(int32_t)hTaskId.taskId);
|
||||
}
|
||||
}
|
||||
|
||||
// drop the stream task now
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
||||
if (code) {
|
||||
tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
|
||||
tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code));
|
||||
} else {
|
||||
tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId);
|
||||
}
|
||||
|
||||
// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
// if (code) {
|
||||
// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
|
||||
// }
|
||||
|
||||
// commit the update
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||
// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId);
|
||||
return 0; // always return success
|
||||
}
|
||||
|
||||
|
@ -857,6 +868,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_DROP_ONE_TASK) {
|
||||
code = streamMetaDropTask(pMeta, req.streamId, req.taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
|
|
|
@ -972,20 +972,28 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
||||
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) {
|
||||
int64_t st = taosGetTimestampMs();
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
||||
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||
|
||||
while (1) {
|
||||
taosWLockLatch(&pTaskInfo->lock);
|
||||
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
taosMsleep(100);
|
||||
|
||||
taosMsleep(200);
|
||||
|
||||
int64_t d = taosGetTimestampMs() - st;
|
||||
if (d >= waitingDuration && waitingDuration >= 0) {
|
||||
qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else { // not running now
|
||||
pTaskInfo->code = rspCode;
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
|
|
|
@ -625,14 +625,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
{ // destroy the related fill-history tasks
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
||||
id, vgId, pReq->taskId, numOfTasks);
|
||||
}
|
||||
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaCommit(pMeta);
|
||||
code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -697,7 +693,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
|
||||
pTask->status.taskStatus = TASK_STATUS__READY;
|
||||
|
||||
code = streamMetaSaveTask(pMeta, pTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -708,10 +704,12 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
|
||||
(int32_t)pReq->hTaskId, numOfTasks);
|
||||
code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code));
|
||||
} else {
|
||||
stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId);
|
||||
}
|
||||
}
|
||||
|
||||
code = streamMetaCommit(pMeta);
|
||||
|
|
|
@ -632,7 +632,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
}
|
||||
|
||||
// todo let's check the status for each task
|
||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
void* buf = NULL;
|
||||
int32_t len;
|
||||
|
@ -682,7 +682,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
|
||||
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
|
||||
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
|
||||
if (code != 0) {
|
||||
|
@ -705,7 +705,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (p != NULL) {
|
||||
stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
|
||||
stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
|
||||
tFreeStreamTask(pTask);
|
||||
return code;
|
||||
}
|
||||
|
@ -735,7 +735,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
return code;
|
||||
}
|
||||
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
|
||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||
|
||||
|
@ -885,6 +885,8 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
|
|||
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||
int32_t code = 0;
|
||||
int32_t waitingDuration = 5000;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
if (code) {
|
||||
|
@ -895,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
|||
|
||||
// let's kill the query procedure within stream, to end it ASAP.
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
@ -932,7 +934,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
|
||||
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||
code = streamMetaRemoveTask(pMeta, &id);
|
||||
code = streamMetaRemoveTaskInMeta(pMeta, &id);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
|
||||
}
|
||||
|
@ -963,6 +965,32 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = 0;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||
numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
code = streamMetaCommit(pMeta);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||
} else {
|
||||
stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||
streamMetaWLock(pMeta);
|
||||
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
|
@ -1185,7 +1213,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
||||
STaskId* pId = taosArrayGet(pRecycleList, i);
|
||||
code = streamMetaRemoveTask(pMeta, pId);
|
||||
code = streamMetaRemoveTaskInMeta(pMeta, pId);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -703,7 +703,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
@ -862,7 +862,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
|||
pStreamTask->status.taskStatus = TASK_STATUS__READY;
|
||||
}
|
||||
|
||||
code = streamMetaSaveTask(pMeta, pStreamTask);
|
||||
code = streamMetaSaveTaskInMeta(pMeta, pStreamTask);
|
||||
streamMutexUnlock(&(pStreamTask->lock));
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
@ -1025,7 +1025,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
|||
// in case of fill-history task, stop the tsdb file scan operation.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||
code = qKillTask(pExecutor, TSDB_CODE_SUCCESS, 10000);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||
|
@ -1287,6 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) {
|
|||
return "resume-task-from-idle";
|
||||
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
||||
return "record-start-failed-task";
|
||||
case STREAM_EXEC_T_DROP_ONE_TASK:
|
||||
return "drop-one-task";
|
||||
case 0:
|
||||
return "exec-all-tasks";
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue