From 2ae05b8cf6e962a6a709678e54781ba8429666b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Apr 2023 08:56:31 +0800 Subject: [PATCH] fix(stream): set the correct initial offset value. --- source/dnode/vnode/src/tq/tq.c | 7 +++++-- tests/system-test/1-insert/delete_childtable.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 190a0893a8..7c7f59b6b7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -130,6 +130,8 @@ int32_t tqInitialize(STQ* pTq) { return -1; } + // the version is kept in task's meta data + // todo check if this version is required or not if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) { return -1; } @@ -554,7 +556,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return 0; } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) { +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; @@ -570,6 +572,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t UNUSED_PARAM(ver)) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + pTask->chkInfo.version = ver; // expand executor if (pTask->fillHistory) { @@ -755,7 +758,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); - // 2.save task + // 2.save task, use the newest commit version as the initial start version of stream task. code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, diff --git a/tests/system-test/1-insert/delete_childtable.py b/tests/system-test/1-insert/delete_childtable.py index e3144edb45..a12f884981 100644 --- a/tests/system-test/1-insert/delete_childtable.py +++ b/tests/system-test/1-insert/delete_childtable.py @@ -27,7 +27,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = 'db_test' self.setsql = TDSetSql() self.stbname = 'stb'