refactor: do some internal refactor.
This commit is contained in:
parent
56c98d7768
commit
c8ad465a0d
|
@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
handle.execHandle.task =
|
||||
` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
||||
if (handle.execHandle.task == NULL) {
|
||||
tqError("cannot create exec task for %s", handle.subKey);
|
||||
code = -1;
|
||||
|
|
|
@ -1098,7 +1098,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
// TODO add more check
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if(pOperator->numOfDownstream != 1){
|
||||
if (pOperator->numOfDownstream != 1) {
|
||||
qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1115,7 +1115,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
// let's seek to the next version in wal file
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
|
||||
qError("tqSeekVer failed ver:%"PRId64", %s" PRId64, pOffset->version + 1, id);
|
||||
qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
@ -1123,6 +1123,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
// those data are from the snapshot in tsdb, besides the data in the wal file.
|
||||
int64_t uid = pOffset->uid;
|
||||
int64_t ts = pOffset->ts;
|
||||
int32_t index = 0;
|
||||
|
||||
// this value may be changed if new tables are created
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
|
@ -1144,7 +1145,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||
|
||||
// start from current accessed position
|
||||
int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable);
|
||||
index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable);
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
if (index >= 0) {
|
||||
|
@ -1183,12 +1184,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
return -1;
|
||||
}
|
||||
|
||||
} else { // subType == TOPIC_SUB_TYPE__TABLE/DB
|
||||
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
|
||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
|
||||
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1218,7 +1219,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64" %s", mtInfo.uid, pOffset->ts, id);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
|
@ -1226,12 +1227,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
|
||||
return -1;
|
||||
}
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id);
|
||||
} else if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
tsdbReaderClose(pInfo->dataReader);
|
||||
pInfo->dataReader = NULL;
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot log");
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue