fix(stream): check status before start init, do some internal refactor.

This commit is contained in:
Haojun Liao 2024-08-02 16:54:08 +08:00
parent 9feacd983f
commit 895a9a1f3d
6 changed files with 118 additions and 59 deletions

View File

@ -641,12 +641,13 @@ bool streamTaskShouldPause(const SStreamTask* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus);
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId);
SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status); const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask); void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask);
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
const char* streamTaskGetExecType(int32_t type);
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);

View File

@ -355,43 +355,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code; return code;
} }
/** // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
* All down stream tasks have successfully completed the check point task. static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
* Current stream task is allowed to start to do checkpoint things in ASYNC model. int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
*/ const char* id, int32_t* pNotReady, int32_t* pTransId) {
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, bool received = false;
int32_t downstreamTaskId) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
const char* id = pTask->id.idStr;
bool received = false;
int32_t total = streamTaskGetNumOfDownstream(pTask);
ASSERT(total > 0);
// 1. not in checkpoint status now
SStreamTaskState pStat = streamTaskGetStatus(pTask);
if (pStat.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
") from task:0x%x, expired and discard ",
id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
return -1;
}
streamMutexLock(&pInfo->lock);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
if (p == NULL) { if (p == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -403,27 +375,69 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
if (received) { if (received) {
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
numOfDownstream);
} else { } else {
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
.downstreamTaskId = downstreamTaskId, .downstreamTaskId = downstreamTaskId,
.checkpointId = pInfo->activeId, .checkpointId = pInfo->activeId,
.transId = pInfo->transId, .transId = pInfo->transId,
.streamId = pTask->id.streamId, .streamId = streamId,
.downstreamNodeId = downstreamNodeId}; .downstreamNodeId = downstreamNodeId};
(void)taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
if (p == NULL) {
stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
return terrno;
}
} }
int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
int32_t transId = pInfo->transId; *pTransId = pInfo->transId;
return 0;
}
/**
* All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
int32_t downstreamTaskId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
const char* id = pTask->id.idStr;
int32_t total = streamTaskGetNumOfDownstream(pTask);
int32_t code = 0;
int32_t notReady = 0;
int32_t transId = 0;
ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG));
// 1. not in checkpoint status now
SStreamTaskState pStat = streamTaskGetStatus(pTask);
if (pStat.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
") from task:0x%x, expired and discard",
id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
return TSDB_CODE_INVALID_MSG;
}
streamMutexLock(&pInfo->lock);
code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
&transId);
streamMutexUnlock(&pInfo->lock); streamMutexUnlock(&pInfo->lock);
if (notReady == 0) { if ((notReady == 0) && (code == 0)) {
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
} }
return 0; return code;
} }
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
@ -1034,8 +1048,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p == NULL) { if (p == NULL) {
streamMutexUnlock(&pInfo->lock); continue;
return num;
} }
if (p->recved) { if (p->recved) {

View File

@ -1464,14 +1464,16 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
} }
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = 0; int32_t code = 0;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
SStreamTask* pTask = NULL;
bool continueExec = true;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
(void)streamMetaAddFailedTask(pMeta, streamId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
@ -1479,10 +1481,26 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
// fill-history task can only be launched by related stream tasks. // fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo; STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__UNINIT) {
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
continueExec = false;
} else {
continueExec = true;
}
streamMutexUnlock(&pTask->lock);
if (!continueExec) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
// avoid initialization and destroy running concurrently. // avoid initialization and destroy running concurrently.
@ -1501,7 +1519,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
tstrerror(code));
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
} }
} }

View File

@ -48,14 +48,15 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
terrstr()); terrstr(terrno));
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
if (streamId != 0) { if (streamId != 0) {
stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
streamTaskGetExecType(execType));
} else { } else {
stDebug("vgId:%d create msg to exec, type:%d", vgId, execType); stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
} }
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;

View File

@ -1150,3 +1150,24 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList); taosArrayClear(pInfo->pCheckpointReadyRecvList);
} }
const char* streamTaskGetExecType(int32_t type) {
switch (type) {
case STREAM_EXEC_T_EXTRACT_WAL_DATA:
return "scan-wal-file";
case STREAM_EXEC_T_START_ALL_TASKS:
return "start-all-tasks";
case STREAM_EXEC_T_START_ONE_TASK:
return "start-one-task";
case STREAM_EXEC_T_RESTART_ALL_TASKS:
return "restart-all-tasks";
case STREAM_EXEC_T_STOP_ALL_TASKS:
return "stop-all-tasks";
case STREAM_EXEC_T_RESUME_TASK:
return "resume-task-from-idle";
case STREAM_EXEC_T_ADD_FAILED_TASK:
return "record-start-failed-task";
default:
return "invalid-exec-type";
}
}

View File

@ -322,12 +322,11 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
if (pTrans->attachEvent.event != 0) { if (pTrans->attachEvent.event != 0) {
code = attachWaitedEvent(pTask, &pTrans->attachEvent); code = attachWaitedEvent(pTask, &pTrans->attachEvent);
streamMutexUnlock(&pTask->lock);
if (code) { if (code) {
return code; return code;
} }
streamMutexUnlock(&pTask->lock);
while (1) { while (1) {
// wait for the task to be here // wait for the task to be here
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
@ -557,6 +556,11 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
} }
const char* streamTaskGetStatusStr(ETaskStatus status) { const char* streamTaskGetStatusStr(ETaskStatus status) {
int32_t index = status;
if (index < 0 || index > tListLen(StreamTaskStatusList)) {
return "";
}
return StreamTaskStatusList[status].name; return StreamTaskStatusList[status].name;
} }