Merge pull request #26700 from taosdata/fix/create_tb
fix(stream): fix race condition when starting check downstream status
This commit is contained in:
commit
2f0768e12d
|
@ -365,6 +365,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
code = streamTrySchedExec(pTask);
|
code = streamTrySchedExec(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamMutexLock(&pInfo->checkInfoLock);
|
streamMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
// drop procedure already started, not start check downstream now
|
||||||
|
ETaskStatus s = streamTaskGetStatus(pTask).state;
|
||||||
|
if (s == TASK_STATUS__DROPPING) {
|
||||||
|
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
|
@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
streamMetaRLock(pMeta);
|
streamMetaRLock(pMeta);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
|
// to make sure check status will not start the check downstream status when we start to check timerActive count.
|
||||||
|
streamMutexLock(&pTask->taskCheckInfo.checkInfoLock);
|
||||||
timerActive = (*ppTask)->status.timerActive;
|
timerActive = (*ppTask)->status.timerActive;
|
||||||
|
streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock);
|
||||||
}
|
}
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue