|
|
|
@ -94,12 +94,12 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
|
|
|
|
|
|
|
|
|
|
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
|
|
|
|
|
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
|
|
|
|
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
|
|
|
|
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
|
|
|
|
|
if (old == 0) {
|
|
|
|
|
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
|
|
|
|
|
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
|
|
|
|
@ -117,7 +117,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pBlock->info.type = STREAM_CHECKPOINT;
|
|
|
|
|
pBlock->info.version = pTask->checkpointingId;
|
|
|
|
|
pBlock->info.version = pTask->chkInfo.checkpointingId;
|
|
|
|
|
pBlock->info.rows = 1;
|
|
|
|
|
pBlock->info.childId = pTask->info.selfChildId;
|
|
|
|
|
|
|
|
|
@ -140,8 +140,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
|
|
|
|
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
|
|
|
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
|
|
|
|
|
|
|
|
|
pTask->checkpointingId = pReq->checkpointId;
|
|
|
|
|
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
|
|
|
|
pTask->chkInfo.checkpointingId = pReq->checkpointId;
|
|
|
|
|
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
|
|
|
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
|
|
|
|
pTask->execInfo.checkpoint += 1;
|
|
|
|
|
|
|
|
|
@ -173,7 +173,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|
|
|
|
|
|
|
|
|
// set task status
|
|
|
|
|
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
|
|
|
|
pTask->checkpointingId = checkpointId;
|
|
|
|
|
pTask->chkInfo.checkpointingId = checkpointId;
|
|
|
|
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
|
|
|
|
@ -181,17 +181,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
|
|
|
|
SStreamMeta* pMeta = pTask->pMeta;
|
|
|
|
|
streamMetaWLock(pMeta);
|
|
|
|
|
|
|
|
|
|
if (pMeta->chkptNotReadyTasks == 0) {
|
|
|
|
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo fix race condition: set the status and append checkpoint block
|
|
|
|
|
int32_t taskLevel = pTask->info.taskLevel;
|
|
|
|
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
|
|
|
@ -200,7 +189,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|
|
|
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
|
|
|
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
|
|
|
|
} else { // only one task exists, no need to dispatch downstream info
|
|
|
|
|
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
|
|
|
|
|
atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
|
|
|
|
|
streamProcessCheckpointReadyMsg(pTask);
|
|
|
|
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
|
|
|
}
|
|
|
|
@ -235,7 +224,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|
|
|
|
|
|
|
|
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
|
|
|
|
// can start local checkpoint procedure
|
|
|
|
|
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
|
|
|
|
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
|
|
|
|
|
|
|
|
|
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
|
|
|
|
// already. And then, dispatch check point msg to all downstream tasks
|
|
|
|
@ -254,7 +243,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
|
|
|
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
|
|
|
|
|
|
|
|
|
|
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
|
|
|
|
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
|
|
|
|
|
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
|
|
|
|
|
ASSERT(notReady >= 0);
|
|
|
|
|
|
|
|
|
|
if (notReady == 0) {
|
|
|
|
@ -270,91 +259,90 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
|
|
|
|
pTask->checkpointingId = 0; // clear the checkpoint id
|
|
|
|
|
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
|
|
|
|
|
pTask->chkInfo.failedId = 0;
|
|
|
|
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
|
|
|
|
pTask->checkpointNotReadyTasks = 0;
|
|
|
|
|
pTask->checkpointAlignCnt = 0;
|
|
|
|
|
pTask->chkInfo.downstreamAlignNum = 0;
|
|
|
|
|
pTask->chkInfo.checkpointNotReadyTasks = 0;
|
|
|
|
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId) {
|
|
|
|
|
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
|
|
|
|
SStreamMeta* pMeta = p->pMeta;
|
|
|
|
|
int32_t vgId = pMeta->vgId;
|
|
|
|
|
const char* id = p->id.idStr;
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
|
|
streamMetaWLock(pMeta);
|
|
|
|
|
|
|
|
|
|
// for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
|
|
|
|
// STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
|
|
|
|
// SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
|
|
|
|
// if (ppTask == NULL) {
|
|
|
|
|
// continue;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// SStreamTask* p = *ppTask;
|
|
|
|
|
if (p->info.fillHistory == 1) {
|
|
|
|
|
// continue;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
|
|
|
|
taosThreadMutexLock(&p->lock);
|
|
|
|
|
|
|
|
|
|
ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId);
|
|
|
|
|
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
|
|
|
|
|
|
|
|
|
|
p->chkInfo.checkpointId = p->checkpointingId;
|
|
|
|
|
streamTaskClearCheckInfo(p);
|
|
|
|
|
|
|
|
|
|
char* str = NULL;
|
|
|
|
|
streamTaskGetStatus(p, &str);
|
|
|
|
|
|
|
|
|
|
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
|
|
|
taosThreadMutexUnlock(&p->lock);
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId);
|
|
|
|
|
return -1;
|
|
|
|
|
} else { // save the task
|
|
|
|
|
streamMetaSaveTask(pMeta, p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stDebug(
|
|
|
|
|
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
|
|
|
|
|
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
|
|
|
|
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
|
|
|
|
str);
|
|
|
|
|
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
|
|
|
|
|
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
|
|
|
|
|
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
|
|
|
|
|
|
|
|
|
|
// save the task if not sink task
|
|
|
|
|
if (p->info.taskLevel != TASK_LEVEL__SINK) {
|
|
|
|
|
streamMetaWLock(pMeta);
|
|
|
|
|
|
|
|
|
|
code = streamMetaSaveTask(pMeta, p);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
|
|
|
|
|
vgId, checkpointId, terrstr());
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = streamMetaCommit(pMeta);
|
|
|
|
|
if (code < 0) {
|
|
|
|
|
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
|
|
|
|
checkpointId, terrstr());
|
|
|
|
|
} else {
|
|
|
|
|
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s",
|
|
|
|
|
id, vgId, checkpointId, terrstr());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
}
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
|
|
|
|
SStreamMeta* pMeta = pTask->pMeta;
|
|
|
|
|
// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
|
|
|
|
// ASSERT(remain >= 0);
|
|
|
|
|
void streamTaskSetFailedId(SStreamTask* pTask) {
|
|
|
|
|
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
|
|
|
|
|
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
|
|
|
|
|
// if (remain == 0) { // all tasks are ready
|
|
|
|
|
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
|
|
|
|
|
streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId);
|
|
|
|
|
streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId);
|
|
|
|
|
stInfo(
|
|
|
|
|
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
|
|
|
|
|
"checkpointId:%" PRId64,
|
|
|
|
|
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
|
|
|
|
|
// } else {
|
|
|
|
|
// stInfo(
|
|
|
|
|
// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec
|
|
|
|
|
// " "not ready:%d/%d", pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain,
|
|
|
|
|
// pMeta->numOfStreamTasks);
|
|
|
|
|
// }
|
|
|
|
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
int64_t startTs = pTask->chkInfo.startTs;
|
|
|
|
|
int64_t ckId = pTask->chkInfo.checkpointingId;
|
|
|
|
|
|
|
|
|
|
// sink task do not need to save the status, and generated the checkpoint
|
|
|
|
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
|
|
|
|
stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel);
|
|
|
|
|
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send check point response to upstream task
|
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
|
|
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
|
|
|
|
} else {
|
|
|
|
@ -362,13 +350,36 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// record the failure checkpoint id
|
|
|
|
|
pTask->chkInfo.failedId = pTask->checkpointingId;
|
|
|
|
|
|
|
|
|
|
// todo: let's retry send rsp to upstream/mnode
|
|
|
|
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
|
|
|
|
pTask->checkpointingId, tstrerror(code));
|
|
|
|
|
ckId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully
|
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
|
|
|
code = streamSaveTaskCheckpointInfo(pTask, ckId);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
|
|
|
|
|
tstrerror(terrno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
streamTaskClearCheckInfo(pTask);
|
|
|
|
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
streamTaskSetFailedId(pTask);
|
|
|
|
|
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr,
|
|
|
|
|
ckId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
double el = (taosGetTimestampMs() - startTs) / 1000.0;
|
|
|
|
|
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ",
|
|
|
|
|
pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
|
|
|
|
|
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|