Merge pull request #25700 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
845fcfa247
|
@ -707,6 +707,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId,
|
|||
int64_t endTs, bool ready);
|
||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
|
||||
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||
int64_t startTs);
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
|
|
|
@ -38,6 +38,56 @@ static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfR
|
|||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
|
||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||
int64_t* oldStage) {
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||
ASSERT(pInfo != NULL);
|
||||
|
||||
*oldStage = pInfo->stage;
|
||||
const char* id = pTask->id.idStr;
|
||||
if (stage == -1) {
|
||||
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
|
||||
upstreamTaskId, vgId, stage);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pInfo->stage == -1) {
|
||||
pInfo->stage = stage;
|
||||
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
|
||||
upstreamTaskId, vgId, stage);
|
||||
}
|
||||
|
||||
if (pInfo->stage < stage) {
|
||||
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
|
||||
", prev:%" PRId64,
|
||||
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
||||
// record the checkpoint failure id and sent to mnode
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
if (pInfo->stage != stage) {
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
return TASK_UPSTREAM_NEW_STAGE;
|
||||
} else if (pTask->status.downstreamReady != 1) {
|
||||
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
|
||||
return TASK_DOWNSTREAM_NOT_READY;
|
||||
} else {
|
||||
return TASK_DOWNSTREAM_READY;
|
||||
}
|
||||
}
|
||||
|
||||
// check status
|
||||
void streamTaskSendCheckMsg(SStreamTask* pTask) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
|
@ -184,14 +234,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
|||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
}
|
||||
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
||||
}
|
||||
streamMetaAddFailedTaskSelf(pTask, now);
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
|
||||
ASSERT(left > 0);
|
||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||
|
|
|
@ -1381,12 +1381,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
|
||||
// todo: may be we should find the related fill-history task and set it failed.
|
||||
// todo: use hashTable instead
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
|
||||
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1414,12 +1413,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||
code = ret;
|
||||
|
||||
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||
}
|
||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -1491,12 +1485,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
|||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
|
||||
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
|
||||
streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
// todo: may be we should find the related fill-history task and set it failed.
|
||||
|
||||
// fill-history task can only be launched by related stream tasks.
|
||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
|
@ -1508,13 +1500,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
|||
|
||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||
|
||||
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||
}
|
||||
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
|
||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -1652,6 +1639,17 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|||
return code;
|
||||
}
|
||||
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||
int64_t startTs) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
|
|
@ -130,56 +130,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||
int64_t* oldStage) {
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||
ASSERT(pInfo != NULL);
|
||||
|
||||
*oldStage = pInfo->stage;
|
||||
const char* id = pTask->id.idStr;
|
||||
if (stage == -1) {
|
||||
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
|
||||
upstreamTaskId, vgId, stage);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pInfo->stage == -1) {
|
||||
pInfo->stage = stage;
|
||||
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
|
||||
upstreamTaskId, vgId, stage);
|
||||
}
|
||||
|
||||
if (pInfo->stage < stage) {
|
||||
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
|
||||
", prev:%" PRId64,
|
||||
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
||||
// record the checkpoint failure id and sent to mnode
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
if (pInfo->stage != stage) {
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
return TASK_UPSTREAM_NEW_STAGE;
|
||||
} else if (pTask->status.downstreamReady != 1) {
|
||||
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
|
||||
return TASK_DOWNSTREAM_NOT_READY;
|
||||
} else {
|
||||
return TASK_DOWNSTREAM_READY;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
|
|
Loading…
Reference in New Issue