refactor(stream)

This commit is contained in:
Liu Jicong 2022-07-10 14:10:39 +08:00
parent 2225411edc
commit bafa54778a
2 changed files with 8 additions and 3 deletions

View File

@ -318,7 +318,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 3.query
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
fetchOffsetNew.version++;
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
}
if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;

View File

@ -427,8 +427,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
// todo refactor
pTableScanInfo->lastStatus.uid = pBlock->info.uid;
pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
/*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
/*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.uid != 0);
return pBlock;