fix(stream): dispatch checkpoint msg to downstream by puting message into input queue.

This commit is contained in:
Haojun Liao 2023-07-13 09:17:20 +08:00
parent 3ce54423a5
commit 08a4fb06ae
8 changed files with 114 additions and 87 deletions

View File

@ -152,6 +152,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__CHECKPOINT_TRIGGER,
STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY, STREAM_INPUT__DESTROY,
}; };

View File

@ -234,12 +234,12 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
int32_t status = pTask->status.taskStatus;
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
int32_t status = pTask->status.taskStatus;
if (status != TASK_STATUS__NORMAL) { if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);

View File

@ -50,7 +50,8 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);

View File

@ -239,7 +239,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked // todo: if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked
streamTaskEnqueueBlocks(pTask, pReq, pRsp); streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
@ -320,7 +320,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
destroyStreamDataBlock((SStreamDataBlock*)pItem); destroyStreamDataBlock((SStreamDataBlock*)pItem);
return code; return code;
} }
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later.

View File

@ -123,52 +123,13 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
} }
static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamCheckpointReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.downstreamNodeId = pTask->info.nodeId,
.downstreamTaskId = pTask->id.taskId,
.childId = pTask->info.selfChildId,
.checkpointId = checkpointId,
};
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // no need to dispatch msg to downstream task
qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr);
streamProcessCheckpointRsp(NULL, pTask);
}
return 0;
}
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) {
SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) { if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pChkpoint->type = STREAM_INPUT__CHECKPOINT; pChkpoint->type = checkpointType;
pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pChkpoint->pBlock == NULL) { if (pChkpoint->pBlock == NULL) {
taosFreeQitem(pChkpoint); taosFreeQitem(pChkpoint);
@ -187,8 +148,6 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) {
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
int32_t code = 0; int32_t code = 0;
int64_t checkpointId = pReq->checkpointId;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
@ -197,7 +156,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
pTask->checkpointingId = pReq->checkpointId; pTask->checkpointingId = pReq->checkpointId;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else.
streamTaskDispatchCheckpointMsg(pTask, checkpointId); // 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already.
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
streamSchedExec(pTask);
return code; return code;
} }
@ -208,10 +169,14 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
// set the task status // set the task status
pTask->checkpointingId = checkpointId; pTask->checkpointingId = checkpointId;
pTask->status.taskStatus = TASK_STATUS__CK; pTask->status.taskStatus = TASK_STATUS__CK;
//todo fix race condition: set the status and append checkpoint block
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
appendCheckpointIntoInputQ(pTask); // todo: sink node needs alignment??
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
streamSchedExec(pTask); streamSchedExec(pTask);
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr); qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
} else { } else {
@ -221,23 +186,27 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
if (notReady > 0) { if (notReady > 0) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d", qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, notReady, num); pTask->id.idStr, notReady, num);
return 0; return 0;
} }
qDebug("s-task:%s received checkpoint req, all upstream sent checkpoint msg, dispatch checkpoint msg to downstream", qDebug(
pTask->id.idStr); "s-task:%s receive one checkpoint req, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to "
"downstream",
pTask->id.idStr, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this node // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure // can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// dispatch check point msg to all downstream tasks // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
streamTaskDispatchCheckpointMsg(pTask, checkpointId); // already. And then, dispatch check point msg to all downstream tasks
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
streamSchedExec(pTask);
} }
return 0; return 0;
@ -255,7 +224,7 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
if (notReady == 0) { if (notReady == 0) {
qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
pTask->id.idStr); pTask->id.idStr);
appendCheckpointIntoInputQ(pTask); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
streamSchedExec(pTask); streamSchedExec(pTask);
} else { } else {
int32_t total = streamTaskGetNumOfDownstream(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask);
@ -269,16 +238,20 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId); ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId);
p->chkInfo.keptCheckpointId = p->checkpointingId; p->chkInfo.keptCheckpointId = p->checkpointingId;
int8_t prev = p->status.taskStatus;
p->status.taskStatus = TASK_STATUS__NORMAL;
streamMetaSaveTask(pMeta, p); streamMetaSaveTask(pMeta, p);
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
", ver:%" PRId64 " currentVer:%" PRId64, " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer); pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer,
streamGetTaskStatusStr(prev));
} }
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {

View File

@ -415,8 +415,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId,
SEpSet* pEpSet) { SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
@ -451,6 +451,45 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointR
return 0; return 0;
} }
int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) {
SStreamCheckpointReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.downstreamNodeId = pTask->info.nodeId,
.downstreamTaskId = pTask->id.taskId,
.childId = pTask->info.selfChildId,
.checkpointId = checkpointId,
};
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
doDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
doDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // no need to dispatch msg to downstream task
qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr);
streamProcessCheckpointRsp(NULL, pTask);
}
return 0;
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId}; SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId};
@ -488,11 +527,7 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
} }
taosArrayClear(pTask->pRpcMsgList); taosArrayClear(pTask->pRpcMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr);
int8_t prev = pTask->status.taskStatus;
pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -505,11 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosArrayClear(pTask->pRpcMsgList); taosArrayClear(pTask->pRpcMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr);
int8_t prev = pTask->status.taskStatus;
pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -293,6 +293,13 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
} }
#endif #endif
static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) {
int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue);
int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall);
return numOfItems1 + numOfItems2;
}
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
// wait for the stream task to be idle // wait for the stream task to be idle
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -313,11 +320,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", pTask->id.idStr,
pTask->id.idStr, pTask->streamTaskId.taskId); pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else { } else {
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr,
pStreamTask->id.idStr);
} }
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
@ -395,8 +403,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
SArray* pBlockList = pMerged->submits; SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
numOfBlocks, pMerged->ver); pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pVer) < pMerged->ver); ASSERT((*pVer) < pMerged->ver);
(*pVer) = pMerged->ver; (*pVer) = pMerged->ver;
@ -433,7 +441,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) { if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask); int32_t code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0; return 0;
} }
} }
@ -442,20 +450,33 @@ int32_t streamExecForAll(SStreamTask* pTask) {
return 0; return 0;
} }
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { int32_t type = pInput->type;
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK || pInput->type == STREAM_INPUT__CHECKPOINT); if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
ASSERT(getNumOfItemsInputQ(pTask) == 1);
}
if (pInput->type == STREAM_INPUT__DATA_BLOCK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
if (type == STREAM_INPUT__DATA_BLOCK) {
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} else { // for sink task, do nothing. } else { // pInput->type == STREAM_INPUT__CHECKPOINT, for sink task, do nothing.
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY; pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0; return 0;
} }
} }
// dispatch checkpoint msg to all downstream tasks
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
qDebug("s-task:%s start to dispatch checkpoint msg to downstream", id);
streamTaskDispatchCheckpointMsg(pTask, pTask->checkpointingId);
return 0;
}
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
const SStreamQueueItem* pItem = pInput; const SStreamQueueItem* pItem = pInput;
@ -475,12 +496,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// update the currentVer if processing the submit blocks. // update the currentVer if processing the submit blocks.
ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version); ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version);
if(ver != pTask->chkInfo.version) { if (ver != pTask->chkInfo.version) {
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
pTask->chkInfo.version); pTask->chkInfo.version);
} }
int32_t type = pInput->type;
streamFreeQitem(pInput); streamFreeQitem(pInput);
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
@ -550,8 +570,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode // todo: let's retry send rsp to upstream/mnode
qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%"PRId64", code:%s", qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s",
pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); pTask->id.idStr, pTask->checkpointingId, tstrerror(code));
} }
} else { } else {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&

View File

@ -190,7 +190,8 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
} }
// do not merge blocks for sink node and check point data block // do not merge blocks for sink node and check point data block
if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || (qItem->type == STREAM_INPUT__CHECKPOINT)) { if ((pTask->info.taskLevel == TASK_LEVEL__SINK) ||
(qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) {
*numOfBlocks = 1; *numOfBlocks = 1;
*pInput = qItem; *pInput = qItem;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;