Merge pull request #24482 from taosdata/szhou/fix/td-28222-2
fix: stream scan core due to table end index introduced in 1 null row…
This commit is contained in:
commit
497d07a198
|
@ -273,7 +273,8 @@ typedef struct STableScanInfo {
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
int32_t tableStartIndex; // current group scan start
|
int32_t tableStartIndex; // current group scan start
|
||||||
int32_t tableEndIndex; // current group scan end
|
int32_t tableEndIndex; // current group scan end
|
||||||
int32_t currentGroupIndex; // current group index of groupOffset
|
int32_t currentGroupId;
|
||||||
|
int32_t currentTable;
|
||||||
int8_t scanMode;
|
int8_t scanMode;
|
||||||
int8_t assignBlockUid;
|
int8_t assignBlockUid;
|
||||||
uint8_t countState; // empty table count state
|
uint8_t countState; // empty table count state
|
||||||
|
|
|
@ -646,10 +646,6 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (initRemainGroups) {
|
|
||||||
pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsTagFilterCache) {
|
if (tsTagFilterCache) {
|
||||||
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
|
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
|
||||||
pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest),
|
pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest),
|
||||||
|
@ -2142,6 +2138,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else if (groupByTbname && pScanNode->groupOrderScan){
|
} else if (groupByTbname && pScanNode->groupOrderScan){
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
|
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
||||||
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else {
|
} else {
|
||||||
pTableListInfo->numOfOuputGroups = 1;
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
}
|
}
|
||||||
|
@ -2159,6 +2157,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
if (groupSort || pScanNode->groupOrderScan) {
|
if (groupSort || pScanNode->groupOrderScan) {
|
||||||
code = sortTableGroup(pTableListInfo);
|
code = sortTableGroup(pTableListInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1264,7 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
|
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
|
||||||
uid = pTableInfo->uid;
|
uid = pTableInfo->uid;
|
||||||
ts = INT64_MIN;
|
ts = INT64_MIN;
|
||||||
pScanInfo->tableEndIndex = 0;
|
pScanInfo->currentTable = 0;
|
||||||
} else {
|
} else {
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
qError("no table in table list, %s", id);
|
qError("no table in table list, %s", id);
|
||||||
|
@ -1278,16 +1278,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||||
|
|
||||||
// start from current accessed position
|
// start from current accessed position
|
||||||
// we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start
|
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
|
||||||
// position, let's find it from the beginning.
|
// position, let's find it from the beginning.
|
||||||
index = tableListFind(pTableListInfo, uid, 0);
|
index = tableListFind(pTableListInfo, uid, 0);
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pScanInfo->tableEndIndex = index;
|
pScanInfo->currentTable = index;
|
||||||
} else {
|
} else {
|
||||||
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
|
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
|
||||||
numOfTables, pScanInfo->tableEndIndex, id);
|
numOfTables, pScanInfo->currentTable, id);
|
||||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1310,12 +1310,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
|
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
|
||||||
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
|
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
|
||||||
} else {
|
} else {
|
||||||
pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
||||||
pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
||||||
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
||||||
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
|
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore the key value
|
// restore the key value
|
||||||
|
|
|
@ -657,33 +657,17 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
|
||||||
|
|
||||||
|
|
||||||
static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) {
|
static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) {
|
||||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
|
||||||
|
|
||||||
STableListInfo* pTableListInfo = pInfo->base.pTableListInfo;
|
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
|
||||||
STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableListInfo, pInfo->tableStartIndex);
|
|
||||||
|
|
||||||
if (pTableListInfo->oneTableForEachGroup) {
|
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
|
||||||
pInfo->tableEndIndex = pInfo->tableStartIndex;
|
|
||||||
} else if (pTableListInfo->groupOffset) {
|
|
||||||
pInfo->currentGroupIndex++;
|
|
||||||
if (pInfo->currentGroupIndex + 1 < pTableListInfo->numOfOuputGroups) {
|
|
||||||
pInfo->tableEndIndex = pTableListInfo->groupOffset[pInfo->currentGroupIndex + 1] - 1;
|
|
||||||
} else {
|
|
||||||
pInfo->tableEndIndex = numOfTables - 1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pInfo->tableEndIndex = numOfTables - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pInfo->needCountEmptyTable) {
|
if (!pInfo->needCountEmptyTable) {
|
||||||
pInfo->countState = TABLE_COUNT_STATE_END;
|
pInfo->countState = TABLE_COUNT_STATE_END;
|
||||||
} else {
|
} else {
|
||||||
pInfo->countState = TABLE_COUNT_STATE_SCAN;
|
pInfo->countState = TABLE_COUNT_STATE_SCAN;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pKeyInfo = pStart;
|
|
||||||
*size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
||||||
|
@ -939,7 +923,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
if (pInfo->tableEndIndex + 1 >= numOfTables) {
|
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
if (pOperator->dynamicTask) {
|
if (pOperator->dynamicTask) {
|
||||||
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
||||||
|
@ -978,13 +962,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
STableKeyInfo* pList = NULL;
|
STableKeyInfo* pList = NULL;
|
||||||
|
|
||||||
if (pInfo->tableEndIndex == -1) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
if (pInfo->tableEndIndex + 1 == numOfTables) {
|
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
initNextGroupScan(pInfo, &pList, &num);
|
initNextGroupScan(pInfo, &pList, &num);
|
||||||
ASSERT(pInfo->base.dataReader == NULL);
|
ASSERT(pInfo->base.dataReader == NULL);
|
||||||
|
|
||||||
|
@ -1034,7 +1019,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
pInfo->tableEndIndex = -1;
|
pInfo->currentGroupId = -1;
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
SSDataBlock* result = NULL;
|
SSDataBlock* result = NULL;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -1059,23 +1044,23 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no data, switch to next table and continue scan
|
// if no data, switch to next table and continue scan
|
||||||
pInfo->tableEndIndex++;
|
pInfo->currentTable++;
|
||||||
|
|
||||||
taosRLockLatch(&pTaskInfo->lock);
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
|
||||||
if (pInfo->tableEndIndex >= numOfTables) {
|
if (pInfo->currentTable >= numOfTables) {
|
||||||
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex);
|
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
|
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
|
||||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
|
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
|
||||||
pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo));
|
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
|
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
@ -1167,9 +1152,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->currentGroupId = -1;
|
||||||
|
|
||||||
pInfo->tableEndIndex = -1;
|
pInfo->tableEndIndex = -1;
|
||||||
pInfo->currentGroupIndex = -1;
|
|
||||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||||
pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
|
pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
|
||||||
|
|
||||||
|
@ -1264,6 +1250,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
|
||||||
pTableScanInfo->base.cond.startVersion = 0;
|
pTableScanInfo->base.cond.startVersion = 0;
|
||||||
pTableScanInfo->base.cond.endVersion = ver;
|
pTableScanInfo->base.cond.endVersion = ver;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
pTableScanInfo->currentGroupId = -1;
|
||||||
pTableScanInfo->tableEndIndex = -1;
|
pTableScanInfo->tableEndIndex = -1;
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
@ -2167,7 +2154,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->pTableScanOp->status = OP_OPENED;
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
|
|
||||||
pTSInfo->scanTimes = 0;
|
pTSInfo->scanTimes = 0;
|
||||||
pTSInfo->tableEndIndex = -1;
|
pTSInfo->currentGroupId = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||||
|
|
Loading…
Reference in New Issue