refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-12 19:08:46 +08:00
parent bc7434e2b5
commit 00e2bdec23
4 changed files with 120 additions and 70 deletions

View File

@ -872,7 +872,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);

View File

@ -526,7 +526,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false);
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
@ -539,7 +539,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
return code;
}
@ -553,13 +553,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false);
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
}
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);

View File

@ -1466,7 +1466,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
continue;
}
@ -1487,7 +1487,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true);
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
@ -1497,10 +1497,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false);
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
}
}
@ -1519,7 +1519,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
@ -1538,10 +1538,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, pInfo->init, pInfo->start, false);
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
}
}

View File

@ -26,6 +26,7 @@
typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta;
STaskId id;
STaskId hTaskId;
} SLaunchHTaskInfo;
typedef struct STaskRecheckInfo {
@ -43,7 +44,8 @@ typedef struct STaskInitTs {
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId,
int32_t hTaskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
@ -394,7 +396,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
// start the related fill-history task, when current task is ready
// not invoke in success callback due to the deadlock.
@ -492,12 +494,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t startTs = pTask->execInfo.init;
int64_t now = taosGetTimestampMs();
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
@ -731,21 +733,27 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
int64_t now = taosGetTimestampMs();
streamMetaWLock(pMeta);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask) {
if (ppTask != NULL) {
ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(*ppTask)) {
char* p = streamTaskGetStatus(*ppTask)->name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
streamMetaWUnLock(pMeta);
// record the related fill-history task failed
STaskId* pHTaskId = &(*ppTask)->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pHTaskId->streamId, pHTaskId->taskId, 0, now, false);
taosMemoryFree(pInfo);
streamMetaWUnLock(pMeta);
return;
}
}
@ -764,8 +772,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
streamMetaReleaseTask(pMeta, pTask);
streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false);
streamMetaReleaseTask(pMeta, pTask);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
@ -798,91 +807,132 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
streamMetaReleaseTask(pMeta, pTask);
}
} else {
stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task",
streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:0x%x failed to load fill-history task, it may have been destroyed, not launch fill-history task",
(int32_t)pInfo->id.taskId);
}
taosMemoryFree(pInfo);
}
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId,
int32_t hTaskId) {
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->id.taskId = taskId;
pInfo->id.streamId = streamId;
pInfo->id.taskId = taskId;
pInfo->hTaskId.streamId = hStreamId;
pInfo->hTaskId.taskId = hTaskId;
pInfo->pMeta = pMeta;
return pInfo;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int64_t streamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY) {
STaskExecStatisInfo* pInfo = &pTask->execInfo;
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", pTask->id.idStr,
pTask->hTaskInfo.id.streamId, hTaskId, pStatus->name);
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, hTaskId, pInfo->init, pInfo->start, false);
return -1;// todo set the correct error code
} else {
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->hTaskInfo.id.streamId, hTaskId);
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId);
if (pInfo == NULL) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
return terrno;
}
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (pHTask == NULL) {
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId,
hTaskId);
// set the launch time info
streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
if (pInfo == NULL) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr);
// check for the timer
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
if (pTask->hTaskInfo.pTimer == NULL) {
ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr,
pTask->status.timerActive);
taosMemoryFree(pInfo);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
return terrno;
}
streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
if (pTask->hTaskInfo.pTimer == NULL) {
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
pTask->status.timerActive);
taosMemoryFree(pInfo);
} else {
ASSERT(ref >= 1);
stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref);
ASSERT(ref >= 1);
stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref);
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) {
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer);
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
}
if ((*pHTask)->status.downstreamReady == 1) {
stDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr);
} else {
checkFillhistoryTaskStatus(pTask, *pHTask);
}
return TSDB_CODE_SUCCESS;
return handleNotBuiltFillHistoryTask(pTask);
}
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
@ -1114,7 +1164,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
}
}
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};