refactor: do some internal refactor.
This commit is contained in:
parent
44208925f9
commit
98758862cf
|
@ -866,7 +866,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
|
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
|
||||||
const char* idStr = pTask->id.idStr;
|
const char* idStr = pTask->id.idStr;
|
||||||
|
@ -894,8 +894,7 @@ static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pTask->hTaskInfo.pTimer == NULL) {
|
if (pTask->hTaskInfo.pTimer == NULL) {
|
||||||
ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr,
|
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
|
||||||
pTask->status.timerActive);
|
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
||||||
|
@ -940,10 +939,10 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
if (pHTask != NULL) {
|
if (pHTask != NULL) { // it is already added into stream meta store.
|
||||||
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
|
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
|
||||||
if (pHisTask == NULL) {
|
if (pHisTask == NULL) {
|
||||||
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr);
|
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
||||||
} else {
|
} else {
|
||||||
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
|
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
|
||||||
|
@ -957,9 +956,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
return launchNotBuiltFillHistoryTask(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return handleNotBuiltFillHistoryTask(pTask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
|
|
Loading…
Reference in New Issue