From 0cd84aa587d95ad17adb4d14da6be70aec892398 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Jan 2024 17:51:50 +0800 Subject: [PATCH] fix(stream): gen checkpoint for single task. --- source/dnode/mnode/impl/src/mndStream.c | 41 ++++++++++--------- .../script/tsim/stream/fillHistoryBasic1.sim | 21 +++++----- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f276f3616b..1d40dd33b2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -3033,7 +3033,7 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { return NULL; } -static void doAddTaskId(SArray* pList, int32_t taskId) { +static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for(int32_t i = 0; i < num; ++i) { int32_t* pId = taosArrayGet(pList, i); @@ -3043,6 +3043,9 @@ static void doAddTaskId(SArray* pList, int32_t taskId) { } taosArrayPush(pList, &taskId); + + int32_t numOfTasks = taosArrayGetSize(pList); + mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks); } int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { @@ -3067,35 +3070,33 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - void **pReqTaskList = taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); + SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId); + doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); - mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, 1, numOfTasks - 1); + pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId); + doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + } - int32_t total = taosArrayGetSize(*pReqTaskList); - if (total == numOfTasks) { // all tasks has send the reqs - int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + int32_t total = taosArrayGetSize(*pReqTaskList); + if (total == numOfTasks) { // all tasks has send the reqs + int64_t checkpointId = mndStreamGenChkpId(pMnode); + mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); - // remove this entry - taosArrayDestroy(*(SArray**)pReqTaskList); - taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); + // remove this entry + taosArrayDestroy(*pReqTaskList); + taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); - int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); - } else { - mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, total, numOfTasks - total); - } + int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); } mndReleaseStream(pMnode, pStream); diff --git a/tests/script/tsim/stream/fillHistoryBasic1.sim b/tests/script/tsim/stream/fillHistoryBasic1.sim index da7969dd31..d2417a73ab 100644 --- a/tests/script/tsim/stream/fillHistoryBasic1.sim +++ b/tests/script/tsim/stream/fillHistoryBasic1.sim @@ -18,6 +18,7 @@ sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); sql create stream stream1 trigger at_once fill_history 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); +sleep 1000 sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791223001,2,2,3,1.1); @@ -224,53 +225,53 @@ endi # row 2 if $data21 != 1 then - print ======$data21 + print ======$data21, expect 1 goto loop01 endi if $data22 != 1 then - print ======$data22 + print ======$data22 , expect 1 goto loop01 endi if $data23 != 3 then - print ======$data23 + print ======$data23 , expect 3 goto loop01 endi if $data24 != 2 then - print ======$data24 + print ======$data24 , expect 2 goto loop01 endi if $data25 != 3 then - print ======$data25 + print ======$data25 , expect 3 goto loop01 endi # row 3 if $data31 != 1 then - print ======$data31 + print ======$data31 , expect 1 goto loop01 endi if $data32 != 1 then - print ======$data32 + print ======$data32 , expect 1 goto loop01 endi if $data33 != 4 then - print ======$data33 + print ======$data33 , expect 4 goto loop01 endi if $data34 != 2 then - print ======$data34 + print ======$data34 , expect 2 goto loop01 endi if $data35 != 3 then - print ======$data35 + print ======$data35 , expect 3 goto loop01 endi