From 69c9eda7af8545d9d8cbbe212634bcdd36bbab33 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Jun 2023 11:37:38 +0800 Subject: [PATCH] fix(stream): fix race condition. --- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 4 +++ source/dnode/vnode/src/tq/tq.c | 29 +++++++++++++++++++--- source/libs/stream/src/stream.c | 5 ++-- source/libs/stream/src/streamExec.c | 4 +-- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 745247b425..9e2705461d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -309,6 +309,7 @@ struct SStreamTask { STaskExec exec; SHistDataRange dataRange; SStreamId historyTaskId; + SStreamId streamTaskId; SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1ab11154f0..b6274b57b8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -319,6 +319,10 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; (*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; + + (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; + (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; + mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 18291ac70f..a7adf247c8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1088,7 +1088,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr); + tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr); int64_t st = taosGetTimestampMs(); streamSourceRecoverScanStep1(pTask); @@ -1100,10 +1100,31 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); - if (pTask->info.fillHistory) { - // todo transfer the executor status, and then destroy this stream task + if (pTask->info.fillHistory) {/* + // 1. stop the related stream task, get the current scan wal version of stream task, ver1. + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + if (pStreamTask == NULL) { + // todo handle error + } + + pStreamTask->status.taskStatus = TASK_STATUS__PAUSE; + + + // if it's an source task, extract the last version in wal. + + // 2. wait for downstream tasks to completed + + + // 3. do secondary scan of the history data scan, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] + + + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. + + + // 5. resume the related stream task. +*/ } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index bfb28c0919..510c18ab65 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -264,13 +264,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } pTask->msgInfo.retryCount = 0; - int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); - ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp - if (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) { + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time int32_t waitDuration = 300; // 300 ms diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b8ae8fc7fd..e4d28e7dd7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -440,8 +440,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", - id, el, resSize / 1048576.0, totalBlocks); + qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", + id, batchSize, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); }