refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-09 14:06:25 +08:00
parent db848f7e6b
commit 6721ec3cd8
4 changed files with 68 additions and 76 deletions

View File

@ -707,6 +707,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId,
int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaRLock(SStreamMeta* pMeta);

View File

@ -38,6 +38,56 @@ static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfR
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
*oldStage = pInfo->stage;
const char* id = pTask->id.idStr;
if (stage == -1) {
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, vgId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
upstreamTaskId, vgId, stage);
}
if (pInfo->stage < stage) {
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
}
if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
return TASK_DOWNSTREAM_NOT_READY;
} else {
return TASK_DOWNSTREAM_READY;
}
}
// check status
void streamTaskSendCheckMsg(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
@ -184,14 +234,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
int32_t startTs = pTask->execInfo.checkTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
streamMetaAddFailedTaskSelf(pTask, now);
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,

View File

@ -1381,12 +1381,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
// todo: may be we should find the related fill-history task and set it failed.
// todo: use hashTable instead
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
@ -1414,12 +1413,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
}
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
streamMetaReleaseTask(pMeta, pTask);
@ -1491,12 +1485,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// todo: may be we should find the related fill-history task and set it failed.
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
@ -1508,13 +1500,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
}
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
streamMetaReleaseTask(pMeta, pTask);
@ -1652,6 +1639,17 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
}
}
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs) {
const char* id = pTask->id.idStr;

View File

@ -130,56 +130,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
*oldStage = pInfo->stage;
const char* id = pTask->id.idStr;
if (stage == -1) {
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, vgId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
upstreamTaskId, vgId, stage);
}
if (pInfo->stage < stage) {
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
}
if (pInfo->stage != stage) {
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
return TASK_DOWNSTREAM_NOT_READY;
} else {
return TASK_DOWNSTREAM_READY;
}
}
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;