fix(stream): drop task in synch model in write thread.
This commit is contained in:
parent
8848ae61a4
commit
465e5eabd7
|
@ -58,7 +58,7 @@ extern "C" {
|
||||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||||
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||||
#define STREAM_EXEC_T_DROP_ONE_TASK (-8)
|
#define STREAM_EXEC_T_STOP_ONE_TASK (-8)
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
|
|
|
@ -720,40 +720,31 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, in
|
||||||
|
|
||||||
// drop the related fill-history task firstly
|
// drop the related fill-history task firstly
|
||||||
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||||
code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, (int32_t)hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||||
|
code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId,
|
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
|
||||||
vgId, (int32_t)hTaskId.taskId, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId,
|
|
||||||
(int32_t)hTaskId.taskId);
|
(int32_t)hTaskId.taskId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code));
|
tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
|
||||||
} else {
|
|
||||||
tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
|
||||||
// if (code) {
|
|
||||||
// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
|
||||||
|
|
||||||
tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId);
|
|
||||||
return 0; // always return success
|
return 0; // always return success
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -868,7 +859,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||||
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||||
return code;
|
return code;
|
||||||
} else if (type == STREAM_EXEC_T_DROP_ONE_TASK) {
|
} else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
|
||||||
code = streamMetaDropTask(pMeta, req.streamId, req.taskId);
|
code = streamMetaDropTask(pMeta, req.streamId, req.taskId);
|
||||||
return code;
|
return code;
|
||||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||||
|
|
|
@ -982,24 +982,24 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) {
|
||||||
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
||||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||||
|
|
||||||
while (1) {
|
// while (1) {
|
||||||
taosWLockLatch(&pTaskInfo->lock);
|
// taosWLockLatch(&pTaskInfo->lock);
|
||||||
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
|
// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
|
||||||
taosWUnLockLatch(&pTaskInfo->lock);
|
// taosWUnLockLatch(&pTaskInfo->lock);
|
||||||
|
//
|
||||||
taosMsleep(200);
|
// taosMsleep(200);
|
||||||
|
//
|
||||||
int64_t d = taosGetTimestampMs() - st;
|
// int64_t d = taosGetTimestampMs() - st;
|
||||||
if (d >= waitingDuration && waitingDuration >= 0) {
|
// if (d >= waitingDuration && waitingDuration >= 0) {
|
||||||
qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0);
|
||||||
return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
}
|
// }
|
||||||
} else { // not running now
|
// } else { // not running now
|
||||||
pTaskInfo->code = rspCode;
|
// pTaskInfo->code = rspCode;
|
||||||
taosWUnLockLatch(&pTaskInfo->lock);
|
// taosWUnLockLatch(&pTaskInfo->lock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
||||||
|
|
|
@ -622,15 +622,19 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
pReq->transId);
|
pReq->transId);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
{ // destroy the related fill-history tasks
|
{ // destroy the related fill-history tasks
|
||||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||||
if (pReq->dropRelHTask) {
|
if (pReq->dropRelHTask) {
|
||||||
code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||||
if (code) {
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code));
|
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
||||||
}
|
id, vgId, pReq->taskId, numOfTasks);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
if (pReq->dropRelHTask) {
|
||||||
|
code = streamMetaCommit(pMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// always return true
|
// always return true
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -704,12 +708,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
|
|
||||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||||
if (pReq->dropRelHTask) {
|
if (pReq->dropRelHTask) {
|
||||||
code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK);
|
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||||
if (code) {
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code));
|
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
|
||||||
} else {
|
(int32_t)pReq->hTaskId, numOfTasks);
|
||||||
stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamMetaCommit(pMeta);
|
code = streamMetaCommit(pMeta);
|
||||||
|
|
|
@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
@ -973,18 +973,18 @@ int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId)
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
// code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||||
numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
// numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
if (code) {
|
// if (code) {
|
||||||
stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
code = streamMetaCommit(pMeta);
|
// code = streamMetaCommit(pMeta);
|
||||||
if (code) {
|
// if (code) {
|
||||||
stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
|
||||||
} else {
|
// } else {
|
||||||
stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
|
// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
|
||||||
}
|
// }
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
|
|
@ -1287,8 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) {
|
||||||
return "resume-task-from-idle";
|
return "resume-task-from-idle";
|
||||||
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
||||||
return "record-start-failed-task";
|
return "record-start-failed-task";
|
||||||
case STREAM_EXEC_T_DROP_ONE_TASK:
|
case STREAM_EXEC_T_STOP_ONE_TASK:
|
||||||
return "drop-one-task";
|
return "stop-one-task";
|
||||||
case 0:
|
case 0:
|
||||||
return "exec-all-tasks";
|
return "exec-all-tasks";
|
||||||
default:
|
default:
|
||||||
|
|
Loading…
Reference in New Issue