Merge branch '3.0' into feature/TD-13066-3.0

This commit is contained in:
Cary Xu 2022-05-18 17:24:07 +08:00
commit 30c09b5b9d
2 changed files with 37 additions and 0 deletions

View File

@ -668,6 +668,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
} }
pDataBlock->info.rows = size; pDataBlock->info.rows = size;
pDataBlock->info.type = STREAM_REPROCESS; pDataBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pDataBlock);
taosArrayClear(pInfo->tsArray); taosArrayClear(pInfo->tsArray);
return pDataBlock; return pDataBlock;
} }
@ -768,6 +769,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
rows = pBlockInfo->rows; rows = pBlockInfo->rows;
doFilter(pInfo->pCondition, pInfo->pRes, NULL); doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes);
break; break;
} }

View File

@ -219,6 +219,41 @@ class TDTestCase:
tdSql.query("drop topic %s"%topicName1) tdSql.query("drop topic %s"%topicName1)
tdLog.info("creat the same topic name , and start to consume")
self.initConsumerTable()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath): def tmqCase2(self, cfgPath, buildPath):