fix: stream scan core due to table end index introduced in 1 null row for empty group

This commit is contained in:
slzhou 2024-01-16 09:28:45 +08:00
parent 53094f62d8
commit 764365047d
3 changed files with 20 additions and 11 deletions

View File

@ -273,6 +273,8 @@ typedef struct STableScanInfo {
int32_t tableStartIndex; // current group scan start
int32_t tableEndIndex; // current group scan end
int32_t currentGroupIndex; // current group index of groupOffset
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
uint8_t countState; // empty table count state

View File

@ -1264,6 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
pScanInfo->currentTable = 0;
pScanInfo->tableEndIndex = 0;
} else {
taosRUnLockLatch(&pTaskInfo->lock);
@ -1278,16 +1279,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->pTableScanOp->resultInfo.totalRows = 0;
// start from current accessed position
// we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
// position, let's find it from the beginning.
index = tableListFind(pTableListInfo, uid, 0);
taosRUnLockLatch(&pTaskInfo->lock);
if (index >= 0) {
pScanInfo->currentTable = index;
pScanInfo->tableEndIndex = index;
} else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->tableEndIndex, id);
numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
@ -1310,12 +1312,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
// restore the key value

View File

@ -939,7 +939,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex + 1 >= numOfTables) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
if (pOperator->dynamicTask) {
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
@ -978,9 +978,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
if (pInfo->tableEndIndex == -1) {
if (pInfo->currentGroupId == -1) {
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex + 1 == numOfTables) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
}
@ -1034,7 +1034,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (pOperator->status == OP_EXEC_DONE) {
pInfo->tableEndIndex = -1;
pInfo->currentGroupId = -1;
pOperator->status = OP_OPENED;
SSDataBlock* result = NULL;
while (true) {
@ -1059,23 +1059,24 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
// if no data, switch to next table and continue scan
pInfo->currentTable++;
pInfo->tableEndIndex++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->tableEndIndex >= numOfTables) {
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return NULL;
}
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex);
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo));
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
@ -1167,6 +1168,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->currentGroupId = -1;
pInfo->tableEndIndex = -1;
pInfo->currentGroupIndex = -1;
@ -1264,6 +1267,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
pTableScanInfo->base.cond.startVersion = 0;
pTableScanInfo->base.cond.endVersion = ver;
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
pTableScanInfo->tableEndIndex = -1;
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
@ -2167,6 +2171,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pTableScanOp->status = OP_OPENED;
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTSInfo->tableEndIndex = -1;
}