fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2024-02-18 17:05:00 +08:00
parent c4f9bee629
commit 245f0ef806
2 changed files with 6 additions and 2 deletions

View File

@ -28,8 +28,8 @@ static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDurat
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);

View File

@ -415,7 +415,6 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
ASSERT(0);
taosMsleep(100);
} else {
// no active event trans exists, handle this event directly
@ -485,6 +484,9 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
// on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask);
taosThreadMutexUnlock(&pTask->lock);
// todo: add parameter to control lock
// after handling the callback function assigned by invoker, go on handling the waiting tasks
if (callbackFn != NULL) {
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
@ -493,6 +495,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
}
taosThreadMutexLock(&pTask->lock);
// tasks in waiting list
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);