fix(stream): fix the deadlock.

This commit is contained in:
Haojun Liao 2024-01-03 22:29:46 +08:00
parent 3fea717373
commit 74df121cf5
1 changed files with 5 additions and 2 deletions

View File

@ -734,8 +734,8 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) {
static int32_t schedTaskInFuture(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d",
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr,
pTask->status.schedIdleTime, ref);
// add one ref count for task
SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
@ -761,6 +761,9 @@ int32_t streamResumeTask(SStreamTask* pTask) {
if (pTask->status.schedIdleTime > 0) {
stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime);
schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
return 0;
} else {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {