Merge pull request #27255 from taosdata/fix/create_tb

fix(stream): check status before start timer.
This commit is contained in:
Haojun Liao 2024-08-16 00:11:29 +08:00 committed by GitHub
commit 06fea91b55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 11 deletions

View File

@ -762,18 +762,27 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
code = sendDispatchMsg(pTask, pTask->msgInfo.pData); code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
streamMutexLock(&pTask->msgInfo.lock); // todo: secure the timerActive and start timer in after lock pTask->lock
if (pTask->msgInfo.inMonitor == 0) { streamMutexLock(&pTask->lock);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); bool shouldStop = streamTaskShouldStop(pTask);
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, streamMutexUnlock(&pTask->lock);
ref, tstrerror(code));
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
pTask->msgInfo.inMonitor = 1;
} else {
stDebug("s-task:%s already in dispatch monitor tmr", id);
}
streamMutexUnlock(&pTask->msgInfo.lock); if (shouldStop) {
stDebug("s-task:%s in stop/dropping status, not start dispatch monitor tmr", id);
} else {
streamMutexLock(&pTask->msgInfo.lock);
if (pTask->msgInfo.inMonitor == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
ref, tstrerror(code));
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
pTask->msgInfo.inMonitor = 1;
} else {
stDebug("s-task:%s already in dispatch monitor tmr", id);
}
streamMutexUnlock(&pTask->msgInfo.lock);
}
// this block can not be deleted until it has been sent to downstream task successfully. // this block can not be deleted until it has been sent to downstream task successfully.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;