fix(stream): add lock, and fix race condition.

This commit is contained in:
Haojun Liao 2024-04-10 18:42:44 +08:00
parent f23a8a37bc
commit e261023ee6
2 changed files with 10 additions and 7 deletions

View File

@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
} }
// extract the required source task for a given stream, identified by streamId // extract the required source task for a given stream, identified by streamId
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i); STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
if (pId->streamId != streamId) { if (pId->streamId != streamId) {
@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
walCloseReader(pReader); walCloseReader(pReader);
} }
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
bool ret = false; bool ret = false;
// double check taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
taosThreadMutexLock(&pTask->lock); pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { ret = true;
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
} }
taosThreadMutexUnlock(&pTask->lock);
return ret; return ret;
} }