refactor(stream): do checkpoint for each task.
This commit is contained in:
parent
0b4946eefb
commit
17bd6badec
|
@ -1702,13 +1702,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle failure to reset from checkpoint procedure
|
// Downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||||
pTask->checkpointingId = req.checkpointId;
|
pTask->checkpointingId = req.checkpointId;
|
||||||
|
|
||||||
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
||||||
", set it failure",
|
", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1735,6 +1734,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: already in checkpoint status, return error
|
||||||
streamProcessCheckpointSourceReq(pTask, &req);
|
streamProcessCheckpointSourceReq(pTask, &req);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId);
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
||||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
pTask->execInfo.checkpoint += 1;
|
pTask->execInfo.checkpoint += 1;
|
||||||
|
|
||||||
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
||||||
// already.
|
|
||||||
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -169,9 +168,8 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||||
int64_t checkpointId = pDataBlock->info.version;
|
int64_t checkpointId = pDataBlock->info.version;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
const char* id = pTask->id.idStr;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
// set task status
|
// set task status
|
||||||
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
||||||
|
@ -197,15 +195,15 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
// todo fix race condition: set the status and append checkpoint block
|
// todo fix race condition: set the status and append checkpoint block
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
int8_t type = pTask->outputInfo.type;
|
||||||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
} else { // only one task exists, no need to dispatch downstream info
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
|
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||||
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
|
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
|
||||||
if (pTask->chkInfo.startTs == 0) {
|
if (pTask->chkInfo.startTs == 0) {
|
||||||
|
@ -231,11 +229,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
id, num);
|
id, num);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
streamTaskBuildCheckpoint(pTask);
|
streamTaskBuildCheckpoint(pTask);
|
||||||
} else {
|
} else { // source & agg tasks need to forward the checkpoint msg downwards
|
||||||
stDebug(
|
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id,
|
||||||
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
num);
|
||||||
"downstream",
|
|
||||||
id, num);
|
|
||||||
|
|
||||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||||
// can start local checkpoint procedure
|
// can start local checkpoint procedure
|
||||||
|
@ -282,48 +278,47 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
||||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
// for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
// STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
// SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
if (ppTask == NULL) {
|
// if (ppTask == NULL) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
|
||||||
SStreamTask* p = *ppTask;
|
// SStreamTask* p = *ppTask;
|
||||||
if (p->info.fillHistory == 1) {
|
if (p->info.fillHistory == 1) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
|
||||||
|
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
|
||||||
streamTaskClearCheckInfo(p);
|
|
||||||
|
|
||||||
char* str = NULL;
|
|
||||||
streamTaskGetStatus(p, &str);
|
|
||||||
|
|
||||||
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return -1;
|
|
||||||
} else { // save the task
|
|
||||||
streamMetaSaveTask(pMeta, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug(
|
|
||||||
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
|
|
||||||
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
|
||||||
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
|
||||||
str);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||||
|
|
||||||
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
|
streamTaskClearCheckInfo(p);
|
||||||
|
|
||||||
|
char* str = NULL;
|
||||||
|
streamTaskGetStatus(p, &str);
|
||||||
|
|
||||||
|
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return -1;
|
||||||
|
} else { // save the task
|
||||||
|
streamMetaSaveTask(pMeta, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug(
|
||||||
|
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
|
||||||
|
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
||||||
|
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
||||||
|
str);
|
||||||
|
|
||||||
code = streamMetaCommit(pMeta);
|
code = streamMetaCommit(pMeta);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
||||||
|
@ -341,24 +336,24 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
|
||||||
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
||||||
ASSERT(remain >= 0);
|
// ASSERT(remain >= 0);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
|
||||||
if (remain == 0) { // all tasks are ready
|
// if (remain == 0) { // all tasks are ready
|
||||||
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
|
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId);
|
||||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId);
|
||||||
stInfo(
|
stInfo(
|
||||||
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
|
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
|
||||||
"checkpointId:%" PRId64,
|
"checkpointId:%" PRId64,
|
||||||
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
|
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
|
||||||
} else {
|
// } else {
|
||||||
stInfo(
|
// stInfo(
|
||||||
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec "
|
// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec "
|
||||||
"not ready:%d/%d",
|
// "not ready:%d/%d",
|
||||||
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks);
|
// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// send check point response to upstream task
|
// send check point response to upstream task
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
@ -368,6 +363,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// record the failure checkpoint id
|
||||||
|
pTask->chkInfo.failedId = pTask->checkpointingId;
|
||||||
|
|
||||||
// todo: let's retry send rsp to upstream/mnode
|
// todo: let's retry send rsp to upstream/mnode
|
||||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
||||||
pTask->checkpointingId, tstrerror(code));
|
pTask->checkpointingId, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue