enh(stream): async handle pause event
This commit is contained in:
parent
7e866c5527
commit
d3e8adf2eb
|
@ -837,7 +837,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
|||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
||||
if (pHistoryTask == NULL) {
|
||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||
", it may have been dropped already",
|
||||
", it may have been dropped already",
|
||||
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
|
|
|
@ -410,6 +410,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t haltCallback(SStreamTask* pTask, void* param) {
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
streamTaskSendCheckpointReq(pTask);
|
||||
}
|
||||
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
@ -419,11 +424,12 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
||||
} else {
|
||||
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask != NULL) {
|
||||
// halt the related stream sink task
|
||||
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
|
||||
pStreamTask->id.idStr, tstrerror(code));
|
||||
|
@ -432,9 +438,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
} else {
|
||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||
}
|
||||
|
||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||
streamTaskSendCheckpointReq(pStreamTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -847,8 +847,8 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
|
|||
pDst->chkpointTransId = pSrc->chkpointTransId;
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||
|
@ -862,6 +862,10 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
|
||||
}
|
||||
|
||||
void streamTaskResume(SStreamTask* pTask) {
|
||||
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
|
Loading…
Reference in New Issue