diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eaa1197a84..d042e463f0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -668,6 +668,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti } pDataBlock->info.rows = size; pDataBlock->info.type = STREAM_REPROCESS; + blockDataUpdateTsWindow(pDataBlock); taosArrayClear(pInfo->tsArray); return pDataBlock; } @@ -768,6 +769,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } rows = pBlockInfo->rows; doFilter(pInfo->pCondition, pInfo->pRes, NULL); + blockDataUpdateTsWindow(pInfo->pRes); break; } diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index d2cccd0532..b536a70515 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -219,6 +219,41 @@ class TDTestCase: tdSql.query("drop topic %s"%topicName1) + tdLog.info("creat the same topic name , and start to consume") + self.initConsumerTable() + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != expectrowcnt: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + tdLog.printNoPrefix("======== test case 1 end ...... ") def tmqCase2(self, cfgPath, buildPath):