diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index af0845aa45..1f16659499 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -52,6 +52,7 @@ enum { TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint + TASK_STATUS__PAUSE, }; enum { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8a5e74327b..20f7bbd17f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1289,7 +1289,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = pReq; action.contLen = sizeof(SVPauseStreamTaskReq); - action.msgType = TDMT_STREAM_TASK_RESUME; + action.msgType = TDMT_STREAM_TASK_PAUSE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 49f9d39afb..56fb47eaf1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1207,13 +1207,24 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - // todo(liuyao) call task api + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + if (pTask) { + tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + } return 0; } int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - // todo(liuyao) call task api + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + if (pTask) { + tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + streamSetStatusNormal(pTask); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + tqStreamTasksScanWal(pTq); + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 7109e0acc9..403a08a767 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || - status == TASK_STATUS__WAIT_DOWNSTREAM) { + status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/tests/script/tsim/stream/fillIntervalPartitionBy.sim b/tests/script/tsim/stream/fillIntervalPartitionBy.sim index 43cbc7adcf..6afd0febd1 100644 --- a/tests/script/tsim/stream/fillIntervalPartitionBy.sim +++ b/tests/script/tsim/stream/fillIntervalPartitionBy.sim @@ -76,6 +76,21 @@ sql select * from streamt5 order by group_id, ts; if $rows != 14 then print ====streamt5=rows5=$rows + print $data00,$data01,$data02,$data03 + print $data10,$data11,$data12,$data13 + print $data20,$data21,$data22,$data23 + print $data30,$data31,$data32,$data33 + print $data40,$data41,$data42,$data43 + print $data50,$data51,$data52,$data53 + print $data60,$data61,$data62,$data63 + print $data70,$data71,$data72,$data73 + print $data80,$data81,$data82,$data83 + print $data90,$data91,$data92,$data93 + print $data[10][0],$data[10][1],$data[10][2],$data[10][3] + print $data[11][0],$data[11][1],$data[10][2],$data[10][3] + print $data[12][0],$data[12][1],$data[10][2],$data[10][3] + print $data[13][0],$data[13][1],$data[10][2],$data[10][3] + print $data[14][0],$data[14][1],$data[10][2],$data[10][3] goto loop2 endi