fix(tmq): prepare scan should reset operator status
This commit is contained in:
parent
31bbef82d1
commit
8127887dc4
|
@ -2843,11 +2843,18 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
||||||
|
|
||||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
int32_t type = pOperator->operatorType;
|
int32_t type = pOperator->operatorType;
|
||||||
|
|
||||||
|
pOperator->status = OP_OPENED;
|
||||||
|
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
||||||
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
|
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
|
||||||
|
|
||||||
|
pScanInfo->pSnapshotReadOp->status = OP_OPENED;
|
||||||
|
|
||||||
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
|
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
|
||||||
|
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
|
||||||
|
|
||||||
if (uid == 0) {
|
if (uid == 0) {
|
||||||
pInfo->noTable = 1;
|
pInfo->noTable = 1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2861,14 +2868,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
pInfo->noTable = 0;
|
pInfo->noTable = 0;
|
||||||
|
|
||||||
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
|
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
|
||||||
tsdbSetTableId(pInfo->dataReader, uid);
|
|
||||||
int64_t oldSkey = pInfo->cond.twindows[0].skey;
|
|
||||||
pInfo->cond.twindows[0].skey = ts + 1;
|
|
||||||
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
|
||||||
pInfo->cond.twindows[0].skey = oldSkey;
|
|
||||||
pInfo->scanTimes = 0;
|
|
||||||
pInfo->curTWinIdx = 0;
|
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
@ -2880,8 +2879,17 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
pInfo->currentTable = i;
|
pInfo->currentTable = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO after processing drop,
|
// TODO after processing drop, found can be false
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
|
tsdbSetTableId(pInfo->dataReader, uid);
|
||||||
|
int64_t oldSkey = pInfo->cond.twindows[0].skey;
|
||||||
|
pInfo->cond.twindows[0].skey = ts + 1;
|
||||||
|
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||||
|
pInfo->cond.twindows[0].skey = oldSkey;
|
||||||
|
pInfo->scanTimes = 0;
|
||||||
|
pInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
pInfo->currentTable, tableSz);
|
pInfo->currentTable, tableSz);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue