refactor(stream): do some internal refactor.
This commit is contained in:
parent
6798e31bb5
commit
035b199497
|
@ -136,15 +136,31 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
pTask->checkpointingId = pReq->checkpointId;
|
pTask->checkpointingId = pReq->checkpointId;
|
||||||
pTask->checkpointNotReadyTasks = 1;
|
pTask->checkpointNotReadyTasks = 1;
|
||||||
|
|
||||||
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else.
|
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
|
||||||
// 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already.
|
// inputQ, to make sure all blocks with less version have been handled by this task already.
|
||||||
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||||
|
pBlock->srcTaskId = pTask->id.taskId;
|
||||||
|
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||||
|
|
||||||
|
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||||
|
if (code == 0) {
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||||
int64_t checkpointId = pDataBlock->info.version;
|
int64_t checkpointId = pDataBlock->info.version;
|
||||||
|
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set the task status
|
// set the task status
|
||||||
pTask->checkpointingId = checkpointId;
|
pTask->checkpointingId = checkpointId;
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
|
@ -153,20 +169,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
pBlock->srcTaskId = pTask->id.taskId;
|
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
|
||||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr,
|
|
||||||
pTask->info.selfChildId);
|
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
|
||||||
if (code != 0) { // todo failed to add it into the output queue, free it.
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
} else { // only one task exists
|
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
}
|
}
|
||||||
} else if (taskLevel == TASK_LEVEL__SINK) {
|
} else if (taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
@ -176,7 +181,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
|
|
||||||
// update the child Id for downstream tasks
|
// update the child Id for downstream tasks
|
||||||
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
||||||
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr);
|
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||||
|
|
||||||
|
@ -188,14 +193,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||||
if (notReady > 0) {
|
if (notReady > 0) {
|
||||||
qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
|
qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
|
||||||
pTask->id.idStr, pTask->info.selfChildId, notReady, num);
|
id, pTask->info.selfChildId, notReady, num);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug(
|
qDebug(
|
||||||
"s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to "
|
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream",
|
||||||
"downstream",
|
id, num);
|
||||||
pTask->id.idStr, num);
|
|
||||||
|
|
||||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||||
// can start local checkpoint procedure
|
// can start local checkpoint procedure
|
||||||
|
@ -204,23 +208,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
|
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
|
||||||
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
||||||
// already. And then, dispatch check point msg to all downstream tasks
|
// already. And then, dispatch check point msg to all downstream tasks
|
||||||
|
code = continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
{
|
|
||||||
pBlock->srcTaskId = pTask->id.taskId;
|
|
||||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
|
||||||
|
|
||||||
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
|
||||||
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
|
||||||
if (code != 0) { // todo failed to add it into the output queue, free it.
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue