From 764365047d07bc83ffff50c10bac1ad179665fd1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 09:28:45 +0800 Subject: [PATCH 1/6] fix: stream scan core due to table end index introduced in 1 null row for empty group --- source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/executor.c | 10 ++++++---- source/libs/executor/src/scanoperator.c | 19 ++++++++++++------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c3f47cde9d..2ea23e73f2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -273,6 +273,8 @@ typedef struct STableScanInfo { int32_t tableStartIndex; // current group scan start int32_t tableEndIndex; // current group scan end int32_t currentGroupIndex; // current group index of groupOffset + int32_t currentGroupId; + int32_t currentTable; int8_t scanMode; int8_t assignBlockUid; uint8_t countState; // empty table count state diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index eb84cb0639..1bccc514e4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1264,6 +1264,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; + pScanInfo->currentTable = 0; pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); @@ -1278,16 +1279,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->pTableScanOp->resultInfo.totalRows = 0; // start from current accessed position - // we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start + // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start // position, let's find it from the beginning. index = tableListFind(pTableListInfo, uid, 0); taosRUnLockLatch(&pTaskInfo->lock); if (index >= 0) { + pScanInfo->currentTable = index; pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, - numOfTables, pScanInfo->tableEndIndex, id); + numOfTables, pScanInfo->currentTable, id); terrno = TSDB_CODE_PAR_INTERNAL_ERROR; return -1; } @@ -1310,12 +1312,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } else { pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1); pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", - uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id); + uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); } // restore the key value diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..6736d6e137 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -939,7 +939,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 >= numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); if (pOperator->dynamicTask) { taosArrayClear(pInfo->base.pTableListInfo->pTableList); @@ -978,9 +978,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - if (pInfo->tableEndIndex == -1) { + if (pInfo->currentGroupId == -1) { int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex + 1 == numOfTables) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } @@ -1034,7 +1034,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } if (pOperator->status == OP_EXEC_DONE) { - pInfo->tableEndIndex = -1; + pInfo->currentGroupId = -1; pOperator->status = OP_OPENED; SSDataBlock* result = NULL; while (true) { @@ -1059,23 +1059,24 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } // if no data, switch to next table and continue scan + pInfo->currentTable++; pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); - if (pInfo->tableEndIndex >= numOfTables) { + if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); taosRUnLockLatch(&pTaskInfo->lock); return NULL; } - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex); + tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, - pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo)); + pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1167,6 +1168,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, if (code != TSDB_CODE_SUCCESS) { goto _error; } + + pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1; pInfo->currentGroupIndex = -1; @@ -1264,6 +1267,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6 pTableScanInfo->base.cond.startVersion = 0; pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->scanTimes = 0; + pTableScanInfo->currentGroupId = -1; pTableScanInfo->tableEndIndex = -1; pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; @@ -2167,6 +2171,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->pTableScanOp->status = OP_OPENED; pTSInfo->scanTimes = 0; + pTSInfo->currentGroupId = -1; pTSInfo->tableEndIndex = -1; } From 9a7ef7fa182f3a350060bd55ba63743ae9e7bf15 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 15:08:50 +0800 Subject: [PATCH 2/6] fix: reuse tableListGetGroupList --- source/libs/executor/src/executil.c | 4 --- source/libs/executor/src/scanoperator.c | 39 ++++++++++++------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1da7818c3c..19f33d2420 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -646,10 +646,6 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf } } - if (initRemainGroups) { - pTableListInfo->numOfOuputGroups = taosHashGetSize(pTableListInfo->remainGroups); - } - if (tsTagFilterCache) { tableList = taosArrayDup(pTableListInfo->pTableList, NULL); pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6736d6e137..36a7f4c183 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -657,33 +657,17 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) { - pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); - STableListInfo* pTableListInfo = pInfo->base.pTableListInfo; - int32_t numOfTables = tableListGetSize(pTableListInfo); - STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pTableListInfo, pInfo->tableStartIndex); + pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - if (pTableListInfo->oneTableForEachGroup) { - pInfo->tableEndIndex = pInfo->tableStartIndex; - } else if (pTableListInfo->groupOffset) { - pInfo->currentGroupIndex++; - if (pInfo->currentGroupIndex + 1 < pTableListInfo->numOfOuputGroups) { - pInfo->tableEndIndex = pTableListInfo->groupOffset[pInfo->currentGroupIndex + 1] - 1; - } else { - pInfo->tableEndIndex = numOfTables - 1; - } - } else { - pInfo->tableEndIndex = numOfTables - 1; - } + pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); if (!pInfo->needCountEmptyTable) { pInfo->countState = TABLE_COUNT_STATE_END; } else { pInfo->countState = TABLE_COUNT_STATE_SCAN; } - - *pKeyInfo = pStart; - *size = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; } void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { @@ -984,7 +968,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); return NULL; } - + + initNextGroupScan(pInfo, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); @@ -1006,16 +991,28 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; } + uInfo("slzhou 1 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); return result; } while (true) { result = startNextGroupScan(pOperator); if (result || pOperator->status == OP_EXEC_DONE) { + if (result) { + uInfo("slzhou 2 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); + } + else { + uInfo("slzhou 2 null block" ); + } return result; } } - + if (result) { + uInfo("slzhou 3 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); + } + else { + uInfo("slzhou 3 null block" ); + } return result; } From de6b559ab9bd1302bd4fb610868abd0cff524edc Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 16 Jan 2024 15:21:20 +0800 Subject: [PATCH 3/6] fix: remove some uage of table end index --- source/libs/executor/inc/executorInt.h | 1 - source/libs/executor/src/executor.c | 2 -- source/libs/executor/src/scanoperator.c | 3 --- 3 files changed, 6 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2ea23e73f2..007eab40f8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -272,7 +272,6 @@ typedef struct STableScanInfo { SSampleExecInfo sample; // sample execution info int32_t tableStartIndex; // current group scan start int32_t tableEndIndex; // current group scan end - int32_t currentGroupIndex; // current group index of groupOffset int32_t currentGroupId; int32_t currentTable; int8_t scanMode; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1bccc514e4..e872c87237 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1265,7 +1265,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT uid = pTableInfo->uid; ts = INT64_MIN; pScanInfo->currentTable = 0; - pScanInfo->tableEndIndex = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); @@ -1286,7 +1285,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (index >= 0) { pScanInfo->currentTable = index; - pScanInfo->tableEndIndex = index; } else { qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, numOfTables, pScanInfo->currentTable, id); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 36a7f4c183..103166e3e7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1057,7 +1057,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; - pInfo->tableEndIndex++; taosRLockLatch(&pTaskInfo->lock); numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -1169,7 +1168,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1; - pInfo->currentGroupIndex = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false; @@ -2169,7 +2167,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pTSInfo->tableEndIndex = -1; } if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { From 449c1631ef04224b67193a6069044d35f63f3317 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 17 Jan 2024 13:34:37 +0800 Subject: [PATCH 4/6] fix: add numOfOutputGroups back when group order scan --- source/libs/executor/src/executil.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 19f33d2420..fd34a98eca 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2155,6 +2155,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* return code; } + if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList); + if (groupSort || pScanNode->groupOrderScan) { code = sortTableGroup(pTableListInfo); } From f1fb7f5c147d112e20f84a07ca4f40d99e25cd7e Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 Jan 2024 08:43:36 +0800 Subject: [PATCH 5/6] fix: change num of output groups when cout empty group is needed --- source/libs/executor/src/executil.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fd34a98eca..033cda43f2 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2138,6 +2138,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && pScanNode->groupOrderScan){ pTableListInfo->numOfOuputGroups = numOfTables; + } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { + pTableListInfo->numOfOuputGroups = numOfTables; } else { pTableListInfo->numOfOuputGroups = 1; } From a1f7169b639af0f8852d4d953aa15895903a40ae Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 18 Jan 2024 09:52:04 +0800 Subject: [PATCH 6/6] fix: remove uInfo --- source/libs/executor/src/scanoperator.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 103166e3e7..09f4f6ac3c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -991,28 +991,16 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (pOperator->dynamicTask) { result->info.id.groupId = result->info.id.uid; } - uInfo("slzhou 1 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); return result; } while (true) { result = startNextGroupScan(pOperator); if (result || pOperator->status == OP_EXEC_DONE) { - if (result) { - uInfo("slzhou 2 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); - } - else { - uInfo("slzhou 2 null block" ); - } return result; } } - if (result) { - uInfo("slzhou 3 %lu, %lu, %lu", result->info.id.groupId, result->info.id.uid, result->info.rows); - } - else { - uInfo("slzhou 3 null block" ); - } + return result; }