fix(stream): fix race condition.

This commit is contained in:
Haojun Liao 2023-12-20 23:12:59 +08:00
parent ad5d78a6ca
commit 32ce4b6a4c
1 changed files with 4 additions and 11 deletions

View File

@ -48,10 +48,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask); static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t streamTaskSetReady(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
SStreamTaskId id = pTask->id;
int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;
char* p = NULL; char* p = NULL;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
@ -70,13 +66,6 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
int64_t el = (pTask->execInfo.start - pTask->execInfo.init); int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p); pTask->id.idStr, numOfDowns, el, p);
taosThreadMutexUnlock(&pTask->lock);
// todo: fix it, to avoid deadlock in: tqStreamTaskProcessUpdateReq
streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true);
taosThreadMutexLock(&pTask->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -392,6 +381,10 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
} }
streamTaskOnHandleEventSuccess(pTask->status.pSM, event); streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
} }
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {