diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ba5219cf20..32d8ce72f0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -837,6 +837,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); +bool streamTaskIsSinkTask(const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 358af0b083..eb5efe07f2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -813,8 +813,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); + tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + p, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c69fe2a79c..1eb66a82ab 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -713,7 +713,13 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { *pStatus = pState->name; } - return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); + // pause & halt will still run for sink tasks. + if (streamTaskIsSinkTask(pTask)) { + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK || + st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT); + } else { + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); + } } static void doStreamExecTaskHelper(void* param, void* tmrId) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 90823cd937..78929c365e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -154,7 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *numOfBlocks = 0; *blockSize = 0; - // todo remove it // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 881f10502c..9e981a79c7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -848,3 +848,7 @@ void streamTaskResume(SStreamTask* pTask) { stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name); } } + +bool streamTaskIsSinkTask(const SStreamTask* pTask) { + return pTask->info.taskLevel == TASK_LEVEL__SINK; +}