From 07301130c5e19676f0b620d0153d392edbdea6f2 Mon Sep 17 00:00:00 2001 From: facetosea <25808407@qq.com> Date: Tue, 5 Dec 2023 11:18:07 +0800 Subject: [PATCH 1/7] enh: event_window + super table need partition by tbname --- source/libs/parser/src/parTranslater.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5c30384a6b..1ef74349b0 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7388,6 +7388,11 @@ static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) { static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + int8_t tableType = ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType; + if (TSDB_SUPER_TABLE == tableType && !hasPartitionByTbname(((SSelectStmt*)pStmt->pQuery)->pPartitionByList)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream query on super tables must patitioned by table name"); + } if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { From 2019e95049ceb18c41b8ca3b9aec48e6b4e3cad6 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 5 Dec 2023 14:08:17 +0800 Subject: [PATCH 2/7] fix: stream for event window --- source/libs/parser/src/parTranslater.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1ef74349b0..ac92c9a427 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7388,8 +7388,10 @@ static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) { static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - int8_t tableType = ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType; - if (TSDB_SUPER_TABLE == tableType && !hasPartitionByTbname(((SSelectStmt*)pStmt->pQuery)->pPartitionByList)) { + if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta + && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType + && !hasPartitionByTbname(pSelect->pPartitionByList) + && pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Stream query on super tables must patitioned by table name"); } From f91d98df2588b1a8b7db8753506d123edfb2cb88 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 5 Dec 2023 14:11:58 +0800 Subject: [PATCH 3/7] fix: error message --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ac92c9a427..88bfcec682 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7393,7 +7393,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm && !hasPartitionByTbname(pSelect->pPartitionByList) && pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream query on super tables must patitioned by table name"); + "Event window for stream on super table must patitioned by table name"); } if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || From a6c876bbc15f082bc144306c8eaee03256fdb03e Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 5 Dec 2023 14:16:01 +0800 Subject: [PATCH 4/7] fix: last/last_row crash with dropped column --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 53 ++++++-- source/libs/executor/src/cachescanoperator.c | 6 + tests/system-test/2-query/last_cache_scan.py | 135 +++++++++++++++++++ 3 files changed, 185 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b6aa791cf0..f668ea5f72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,19 +22,34 @@ #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) +static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) { + char *buf = taosMemoryCalloc(1, pCol->info.bytes); + SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE); + pRes->bytes = 0; + pRes->hasResult = true; + pRes->isNull = true; + varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE); + colDataSetVal(pCol, row, buf, false); + taosMemoryFree(buf); +} + static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { - uint64_t ts = 0; - SFirstLastRes* p; - col_id_t colId; + uint64_t ts = TSKEY_MIN; + SFirstLastRes* p = NULL; + col_id_t colId = -1; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); - int32_t slotId = slotIds[i]; - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + if (slotIds[i] == -1) { + setFirstLastResColToNull(pColInfoData, numOfRows); + continue; + } + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); colId = pColVal->colVal.cid; p = (SFirstLastRes*)varDataVal(pRes[i]); @@ -63,10 +78,14 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataSetVal(pCol, numOfRows, (const char*)&ts, false); + if (ts == TSKEY_MIN) { + colDataSetNULL(pCol, numOfRows); + } else { + colDataSetVal(pCol, numOfRows, (const char*)&ts, false); + } continue; - } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) { - if (!p->isNull) { + } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) { + if (p && !p->isNull) { colDataSetVal(pCol, numOfRows, p->buf, false); } else { colDataSetNULL(pCol, numOfRows); @@ -81,6 +100,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); int32_t slotId = slotIds[i]; + if (slotId == -1) { + colDataSetNULL(pColInfoData, numOfRows); + continue; + } SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); SColVal* pVal = &pColVal->colVal; @@ -300,7 +323,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } for (int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE); + int32_t bytes; + if (slotIds[j] == -1) + bytes = 1; + else + bytes = pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } @@ -324,6 +353,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t i = 0; i < pr->numOfCols; ++i) { int32_t slotId = slotIds[i]; + if (slotId == -1) { + SLastCol p = {.ts = INT64_MIN, .colVal.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL}; + taosArrayPush(pLastCols, &p); + continue; + } struct STColumn* pCol = &pr->pSchema->columns[slotId]; SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; @@ -348,6 +382,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 bool hasNotNullRow = true; int64_t singleTableLastTs = INT64_MAX; for (int32_t k = 0; k < pr->numOfCols; ++k) { + if (slotIds[k] == -1) continue; SLastCol* p = taosArrayGet(pLastCols, k); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6d59698855..75fe3c51dd 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -352,6 +352,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); + bool found = false; for (int32_t j = 0; j < pWrapper->nCols; ++j) { /* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->dstSlotId] = -1; @@ -361,9 +362,14 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask if (pColMatch->colId == pWrapper->pSchema[j].colId) { (*pSlotIds)[i] = j; (*pDstSlotIds)[i] = pColMatch->dstSlotId; + found = true; break; } } + if (!found) { + (*pSlotIds)[i] = -1; + (*pDstSlotIds)[i] = pColMatch->dstSlotId; + } } return TSDB_CODE_SUCCESS; diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index 0f0936ebab..01795f6eef 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -1,3 +1,4 @@ +from sqlite3 import ProgrammingError import taos import sys import time @@ -319,10 +320,144 @@ class TDTestCase: tdSql.checkData(0, 0, '2018-11-25 19:30:00.000') tdSql.checkData(0, 1, '2018-11-25 19:30:01.000') + def test_cache_scan_with_drop_and_add_column(self): + tdSql.query("select last(c10) from meters") + tdSql.checkData(0, 0, '2018-11-25 19:30:01') + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query("select last(c10) from meters", queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.query('select last(*) from meters', queryTimes=1) + tdSql.checkData(0, 10, None) + tdSql.query('select last(c10), c10, ts from meters', queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + def test_cache_scan_with_drop_and_add_column2(self): + tdSql.query("select last(c1) from meters") + tdSql.checkData(0, 0, '999') + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"]) + p.check_returncode() + tdSql.query("select last(c1) from meters", queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.query('select last(*) from meters', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkData(0, 1, None) + tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + try: + tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1) + except Exception as e: + if str(e).count('Invalid column name') == 1: + print('column has been dropped, the cache has been updated: %s' % (str(e))) + return + else: + raise + tdSql.checkRows(1) + tdSql.checkCols(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, None) + + try: + tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1) + except Exception as e: + if str(e).count('Invalid column name') == 1: + print('column has been dropped, the cache has been updated: %s' % (str(e))) + return + else: + raise + tdSql.checkRows(1) + tdSql.checkCols(3) + tdSql.checkData(0, 0, None) + + def test_cache_scan_with_drop_column(self): + tdSql.query('select last(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c9"]) + p.check_returncode() + tdSql.query('select last(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + tdSql.checkData(0, 9, None) + + def test_cache_scan_last_row_with_drop_column(self): + tdSql.query('select last_row(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + tdSql.checkData(0, 10, None) + + def test_cache_scan_last_row_with_drop_column2(self): + tdSql.query('select last_row(c1) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkData(0, 0, None) + + def test_cache_scan_last_row_with_partition_by(self): + tdSql.query('select last(c1) from meters partition by t1') + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(5) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + tdSql.checkData(2, 0, None) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) + + def test_cache_scan_last_row_with_partition_by_tbname(self): + tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(10) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(10) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + tdSql.checkData(2, 0, None) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) + + + def run(self): self.prepareTestEnv() #time.sleep(99999999) self.test_last_cache_scan() + #self.test_cache_scan_with_drop_and_add_column() + self.test_cache_scan_with_drop_and_add_column2() + #self.test_cache_scan_with_drop_column() + #self.test_cache_scan_last_row_with_drop_column() + #self.test_cache_scan_last_row_with_drop_column2() + #self.test_cache_scan_last_row_with_partition_by() + #self.test_cache_scan_last_row_with_partition_by_tbname() def stop(self): tdSql.close() From cb3d9551117f81ab37ff70306938c65307cbb45c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 5 Dec 2023 16:19:56 +0800 Subject: [PATCH 5/7] code coverage --- tests/parallel_test/cases.task | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cf4559100b..ef222db0f9 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1132,6 +1132,9 @@ e ,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ,,y,script,./test.sh -f tsim/stream/distributeSession0.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim +,,y,script,./test.sh -f tsim/stream/event0.sim +,,y,script,./test.sh -f tsim/stream/event1.sim +,,y,script,./test.sh -f tsim/stream/event2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim From b642f0fc83060c7bb7961d9e6cbadc073242c3cd Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 5 Dec 2023 17:51:48 +0800 Subject: [PATCH 6/7] enh: adjust ENodeType for show statements nodes --- include/common/tmsg.h | 56 +++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b42df78e34..aff1bd55e4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -295,7 +295,36 @@ typedef enum ENodeType { QUERY_NODE_SYNCDB_STMT, QUERY_NODE_GRANT_STMT, QUERY_NODE_REVOKE_STMT, - QUERY_NODE_SHOW_DNODES_STMT, + // placeholder for [152, 180] + QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181, + QUERY_NODE_SHOW_CREATE_DATABASE_STMT, + QUERY_NODE_SHOW_CREATE_TABLE_STMT, + QUERY_NODE_SHOW_CREATE_STABLE_STMT, + QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT, + QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT, + QUERY_NODE_SHOW_SCORES_STMT, + QUERY_NODE_SHOW_TABLE_TAGS_STMT, + QUERY_NODE_KILL_CONNECTION_STMT, + QUERY_NODE_KILL_QUERY_STMT, + QUERY_NODE_KILL_TRANSACTION_STMT, + QUERY_NODE_DELETE_STMT, + QUERY_NODE_INSERT_STMT, + QUERY_NODE_QUERY, + QUERY_NODE_SHOW_DB_ALIVE_STMT, + QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT, + QUERY_NODE_BALANCE_VGROUP_LEADER_STMT, + QUERY_NODE_RESTORE_DNODE_STMT, + QUERY_NODE_RESTORE_QNODE_STMT, + QUERY_NODE_RESTORE_MNODE_STMT, + QUERY_NODE_RESTORE_VNODE_STMT, + QUERY_NODE_PAUSE_STREAM_STMT, + QUERY_NODE_RESUME_STREAM_STMT, + QUERY_NODE_CREATE_VIEW_STMT, + QUERY_NODE_DROP_VIEW_STMT, + + // show statement nodes + // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' + QUERY_NODE_SHOW_DNODES_STMT = 400, QUERY_NODE_SHOW_MNODES_STMT, QUERY_NODE_SHOW_MODULES_STMT, QUERY_NODE_SHOW_QNODES_STMT, @@ -324,31 +353,6 @@ typedef enum ENodeType { QUERY_NODE_SHOW_VNODES_STMT, QUERY_NODE_SHOW_USER_PRIVILEGES_STMT, QUERY_NODE_SHOW_VIEWS_STMT, - QUERY_NODE_SHOW_CREATE_VIEW_STMT, - QUERY_NODE_SHOW_CREATE_DATABASE_STMT, - QUERY_NODE_SHOW_CREATE_TABLE_STMT, - QUERY_NODE_SHOW_CREATE_STABLE_STMT, - QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT, - QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT, - QUERY_NODE_SHOW_SCORES_STMT, - QUERY_NODE_SHOW_TABLE_TAGS_STMT, - QUERY_NODE_KILL_CONNECTION_STMT, - QUERY_NODE_KILL_QUERY_STMT, - QUERY_NODE_KILL_TRANSACTION_STMT, - QUERY_NODE_DELETE_STMT, - QUERY_NODE_INSERT_STMT, - QUERY_NODE_QUERY, - QUERY_NODE_SHOW_DB_ALIVE_STMT, - QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT, - QUERY_NODE_BALANCE_VGROUP_LEADER_STMT, - QUERY_NODE_RESTORE_DNODE_STMT, - QUERY_NODE_RESTORE_QNODE_STMT, - QUERY_NODE_RESTORE_MNODE_STMT, - QUERY_NODE_RESTORE_VNODE_STMT, - QUERY_NODE_PAUSE_STREAM_STMT, - QUERY_NODE_RESUME_STREAM_STMT, - QUERY_NODE_CREATE_VIEW_STMT, - QUERY_NODE_DROP_VIEW_STMT, // logic plan node QUERY_NODE_LOGIC_PLAN_SCAN = 1000, From cc51fd47d0bc7812cc8cc638177c0a23d523ef54 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 5 Dec 2023 19:09:29 +0800 Subject: [PATCH 7/7] fix:offset out of range if poll offset is read from offsetRestored while wal is deleted --- source/dnode/vnode/src/tq/tqUtil.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8f62928d22..110cf79b4e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -168,6 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } + dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : {