refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-13 00:10:47 +08:00
parent 00e2bdec23
commit 44208925f9
1 changed files with 69 additions and 42 deletions

View File

@ -730,6 +730,42 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
}
static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
pHTaskInfo->id.taskId = 0;
pHTaskInfo->id.streamId = 0;
}
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
if (streamTaskShouldStop(pTask)) { // record the failure
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId,
pInfo->hTaskId.taskId, ref);
streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
taosMemoryFree(pInfo);
} else {
char* p = streamTaskGetStatus(pTask)->name;
int32_t hTaskId = pHTaskInfo->id.taskId;
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
}
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
@ -738,28 +774,35 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
streamMetaWLock(pMeta);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask != NULL) {
if (ppTask == NULL || *ppTask == NULL) {
stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch",
(int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId);
streamMetaWUnLock(pMeta);
// already dropped, no need to set the failure info into the stream task meta.
taosMemoryFree(pInfo);
return;
}
if (streamTaskShouldStop(*ppTask)) {
ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(*ppTask)) {
char* p = streamTaskGetStatus(*ppTask)->name;
char* p = streamTaskGetStatus(*ppTask)->name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
streamMetaWUnLock(pMeta);
streamMetaWUnLock(pMeta);
// record the related fill-history task failed
STaskId* pHTaskId = &(*ppTask)->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pHTaskId->streamId, pHTaskId->taskId, 0, now, false);
taosMemoryFree(pInfo);
return;
}
// record the related fill-history task failed
streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
taosMemoryFree(pInfo);
return;
}
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId);
streamMetaWUnLock(pMeta);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
if (pTask != NULL) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -771,34 +814,18 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false);
streamMetaReleaseTask(pMeta, pTask);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
pHTaskInfo->id.taskId = 0;
pHTaskInfo->id.streamId = 0;
noRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(pTask))) {
char* p = streamTaskGetStatus(pTask)->name;
int32_t hTaskId = pHTaskInfo->id.taskId;
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
if (pHTask == NULL) {
doRetryLaunchFillHistoryTask(pTask, pInfo, now);
streamMetaReleaseTask(pMeta, pTask);
return;
}
if (pHTask != NULL) {
} else {
checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
@ -807,15 +834,15 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
}
} else {
streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:0x%x failed to load fill-history task, it may have been destroyed, not launch fill-history task",
(int32_t)pInfo->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
} else {
streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d",
(int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId, ref);
}
taosMemoryFree(pInfo);