fix(stream): set not-ready initial value.

This commit is contained in:
Haojun Liao 2023-07-18 18:29:35 +08:00
parent d2c875e774
commit 902f9d1f94
3 changed files with 16 additions and 1 deletions

View File

@ -248,7 +248,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
ASSERT(pInfo != NULL);
if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId);
qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
} else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked

View File

@ -174,6 +174,16 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// set task status
pTask->status.taskStatus = TASK_STATUS__CK;
{ // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
}
taosWUnLockLatch(&pMeta->lock);
}
//todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {

View File

@ -545,6 +545,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
// check for all tasks, and do generate the vnode-wide checkpoint data.
// todo extract method
SStreamMeta* pMeta = pTask->pMeta;
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
@ -554,6 +556,9 @@ int32_t streamTryExec(SStreamTask* pTask) {
streamSaveTasks(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status:%d, total:%d", pMeta->vgId, remain,
(int32_t)taosArrayGetSize(pMeta->pTaskList));
}
// send check point response to upstream task