fix(stream): fix dead lock.

This commit is contained in:
Haojun Liao 2024-04-28 18:35:00 +08:00
parent b990632e8d
commit 6c93fe5593
1 changed files with 15 additions and 8 deletions

View File

@ -1710,21 +1710,29 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo;
int64_t now = taosGetTimestampMs();
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false);
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) {
STaskId hId = (*ppTask)->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false);
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
@ -1732,6 +1740,5 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamMetaRUnLock(pMeta);
return code;
}