diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c3f47cde9d..2ea23e73f2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -273,6 +273,8 @@ typedef struct STableScanInfo { int32_t tableStartIndex; // current group scan start int32_t tableEndIndex; // current group scan end int32_t currentGroupIndex; // current group index of groupOffset + int32_t currentGroupId; + int32_t currentTable; int8_t scanMode; int8_t assignBlockUid; uint8_t countState; // empty table count state diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index eb84cb0639..1bccc514e4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1264,6 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; + pScanInfo->currentTable = 0; pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); @@ -1278,16 +1279,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - // we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start // position, let's find it from the beginning. index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { + pScanInfo->currentTable = index; pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, - numOfTables, pScanInfo->tableEndIndex, id); + numOfTables, pScanInfo->currentTable, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; } @@ -1310,12 +1312,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1); pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..6736d6e137 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -939,7 +939,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 >= numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); if (pOperator->dynamicTask) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); @@ -978,9 +978,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - if (pInfo->tableEndIndex == -1) { + if (pInfo->currentGroupId == -1) { int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 == numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } @@ -1034,7 +1034,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } if (pOperator->status == OP_EXEC_DONE) { - pInfo->tableEndIndex = -1; + pInfo->currentGroupId = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; while (true) { @@ -1059,23 +1059,24 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } // if no data, switch to next table and continue scan + pInfo->currentTable++; pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex >= numOfTables) { + if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); taosRUnLockLatch(&pTaskInfo->lock); return NULL; } - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex); + tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, - pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo)); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1167,6 +1168,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, if (code != TSDB_CODE_SUCCESS) { goto _error; } + + pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1; pInfo->currentGroupIndex = -1; @@ -1264,6 +1267,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6 pTableScanInfo->base.cond.startVersion = 0; pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->scanTimes = 0; + pTableScanInfo->currentGroupId = -1; pTableScanInfo->tableEndIndex = -1; pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; @@ -2167,6 +2171,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->pTableScanOp->status = OP_OPENED; pTSInfo->scanTimes = 0; + pTSInfo->currentGroupId = -1; pTSInfo->tableEndIndex = -1; }