diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index cb4be7a539..a273f2edec 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = -` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); + qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); if (handle.execHandle.task == NULL) { tqError("cannot create exec task for %s", handle.subKey); code = -1; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 06813f0148..1b71a41586 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1098,7 +1098,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if(pOperator->numOfDownstream != 1){ + if (pOperator->numOfDownstream != 1) { qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); return -1; } @@ -1115,7 +1115,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:%"PRId64", %s" PRId64, pOffset->version + 1, id); + qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1123,6 +1123,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // those data are from the snapshot in tsdb, besides the data in the wal file. int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; + int32_t index = 0; // this value may be changed if new tables are created taosRLockLatch(&pTaskInfo->lock); @@ -1144,7 +1145,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { @@ -1183,12 +1184,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } - } else { // subType == TOPIC_SUB_TYPE__TABLE/DB + } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); + qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); return -1; } @@ -1218,7 +1219,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; - qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64" %s", mtInfo.uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -1226,12 +1227,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); return -1; } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; - qDebug("tmqsnap qStreamPrepareScan snapshot log"); + qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id); } }