opti:tmq logic & fix CI cases
This commit is contained in:
parent
d0c55ef572
commit
9d9ae749d1
|
@ -395,6 +395,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
return -1;
|
||||
}
|
||||
|
||||
// offset set to previous version when init
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
|
|
|
@ -65,7 +65,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
|
||||
qTaskInfo_t task = pExec->task;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||
tqError("prepare scan failed, return");
|
||||
|
|
|
@ -1077,6 +1077,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
||||
// if pOffset equal to current offset, means continue consume
|
||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -1097,7 +1098,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
// let's seek to the next version in wal file
|
||||
// set version to read for wal is next, so +1
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
|
||||
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
|
||||
return -1;
|
||||
|
@ -1119,8 +1120,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
}
|
||||
}
|
||||
|
||||
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
||||
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
|
||||
|
|
|
@ -1617,18 +1617,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
|
||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||
while (1) {
|
||||
SFetchRet ret = {0};
|
||||
tqNextBlock(pInfo->tqReader, &ret);
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); //curVersion move to next, so currentOffset = curVersion - 1
|
||||
|
||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||
qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
|
||||
|
|
|
@ -53,7 +53,7 @@ class TDTestCase:
|
|||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
|
@ -237,10 +237,10 @@ class TDTestCase:
|
|||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 2
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
elif self.snapshot == 1:
|
||||
consumerId = 3
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4))
|
||||
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 1
|
||||
|
@ -270,7 +270,7 @@ class TDTestCase:
|
|||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
elif self.snapshot == 1:
|
||||
if totalConsumeRows != totalRowsFromQuery:
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error with snapshot = 1!")
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
@ -323,7 +323,7 @@ class TDTestCase:
|
|||
|
||||
if self.snapshot == 0:
|
||||
consumerId = 4
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1/4 + 3/4))
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
elif self.snapshot == 1:
|
||||
consumerId = 5
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
|
||||
|
@ -369,11 +369,7 @@ class TDTestCase:
|
|||
tdLog.info("act consume rows: %d, act query rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsFromQuery, expectrowcnt))
|
||||
|
||||
if self.snapshot == 0:
|
||||
# If data writing is completed before consumer get snapshot, will consume 7500 from wal;
|
||||
# If data writing has not started before consumer get snapshot, will consume 10000 from wal;
|
||||
minRows = int(expectrowcnt * (1 - 1/4)) # 7500
|
||||
tdLog.info("consume rows should be between %d and %d, "%(minRows, expectrowcnt))
|
||||
if not ((totalConsumeRows >= minRows) and (totalConsumeRows <= expectrowcnt)):
|
||||
if (totalConsumeRows != expectrowcnt):
|
||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||
elif self.snapshot == 1:
|
||||
tdLog.info("consume rows should be between %d and %d, "%(totalRowsFromQuery, expectrowcnt))
|
||||
|
@ -494,7 +490,7 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||
|
||||
def run(self):
|
||||
# tdSql.prepare()
|
||||
tdSql.prepare()
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.snapshot = 0
|
||||
|
@ -520,11 +516,11 @@ class TDTestCase:
|
|||
self.prepareTestEnv()
|
||||
self.tmqCase3()
|
||||
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.snapshot = 0
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase4()
|
||||
# tdLog.printNoPrefix("=============================================")
|
||||
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
# self.snapshot = 0
|
||||
# self.prepareTestEnv()
|
||||
# self.tmqCase4()
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
|
|
Loading…
Reference in New Issue