refactor: do some internal refactor and add some logs for tmq.
This commit is contained in:
parent
b46098793d
commit
a28156e832
|
@ -1048,15 +1048,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
||||
pTaskInfo->streamInfo.returned = 0;
|
||||
|
||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
uint16_t type = pOperator->operatorType;
|
||||
pOperator->status = OP_OPENED;
|
||||
|
||||
// TODO add more check
|
||||
if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
ASSERT(pOperator->numOfDownstream == 1);
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
|
@ -1070,7 +1071,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||
// iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
|
||||
// those data are from the snapshot in tsdb, besides the data in the wal file.
|
||||
int64_t uid = pOffset->uid;
|
||||
int64_t ts = pOffset->ts;
|
||||
|
||||
|
@ -1129,7 +1131,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||
ts, pTableScanInfo->currentTable, numOfTables);
|
||||
/*}*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -1172,7 +1173,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
|
@ -1180,7 +1181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
|
||||
return -1;
|
||||
}
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
|
||||
} else if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
tsdbReaderClose(pInfo->dataReader);
|
||||
|
|
Loading…
Reference in New Issue