From fe1a7e4b9c138781185d091d9e21d899a21bd4c8 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 29 Mar 2024 11:05:45 +0800 Subject: [PATCH 1/5] feat: add pk col to scan cols of scan logic node of delete operation --- source/libs/planner/src/planLogicCreater.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index c79fbd01f0..b5236fee9e 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1696,6 +1696,11 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p } } + STableMeta* pMeta = ((SRealTableNode*)pDelete->pFromTable)->pMeta; + if (TSDB_CODE_SUCCESS == code && hasPkInTable(pMeta)) { + code = addPkCol(pMeta->uid, pMeta->schema + 1, &pScan->pScanCols, pMeta); + } + if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagCond) { pScan->pTagCond = nodesCloneNode(pDelete->pTagCond); if (NULL == pScan->pTagCond) { From b50462783061cd60df5146b83d13e1de9e540eb7 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 29 Mar 2024 17:01:56 +0800 Subject: [PATCH 2/5] fix: new group block to reset func input iter hasPrev --- include/libs/function/function.h | 7 ++----- source/libs/executor/src/tsort.c | 5 +++-- source/libs/function/src/builtinsimpl.c | 7 ++++++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 78bd1d807b..6c60e1c4a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -212,11 +212,8 @@ typedef struct SFuncInputRowIter { char* pPrevPk; SSDataBlock* pPrevRowBlock; // pre one row block - //TODO: - // int32_t prevStartOffset; // for diff, derivative. - // SPoint1 prevStartPoint; // for twa. - // int32_t startOffset; // for diff, derivative. - // SPoint1 startPoint; // for twa. + uint64_t groupId; + bool hasGroupId; bool finalRow; } SFuncInputRowIter; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 271d3e05a5..44404c345e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1306,6 +1306,7 @@ static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource static void initRowIdSort(SSortHandle* pHandle) { SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; + SColumnInfoData pkCol = {0}; SSDataBlock* pSortInput = createDataBlock(); SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); @@ -1317,7 +1318,7 @@ static void initRowIdSort(SSortHandle* pHandle) { SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); blockDataAppendColInfo(pSortInput, &lengthCol); if (pHandle->bSortPk) { - SColumnInfoData pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); + pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); blockDataAppendColInfo(pSortInput, &pkCol); } blockDataDestroy(pHandle->pDataBlock); @@ -1343,7 +1344,7 @@ static void initRowIdSort(SSortHandle* pHandle) { biPk.order = pkOrder->order; biPk.slotId = 4; biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC); - biPk.compFn = getKeyComparFunc(extPkCol->info.type, biPk.order); + biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order); taosArrayPush(aOrder, &biPk); } taosArrayDestroy(pHandle->pSortInfo); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c029a809eb..fafc313afc 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -434,7 +434,12 @@ int32_t funcInputUpdate(SqlFunctionCtx* pCtx) { pIter->pPkCol = pIter->pInput->pPrimaryKey; pIter->rowIndex = pIter->pInput->startRowIndex; pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1; - pIter->pSrcBlock = pCtx->pSrcBlock; + pIter->pSrcBlock = pCtx->pSrcBlock; + if (!pIter->hasGroupId || pIter->groupId != pIter->pSrcBlock->info.id.groupId) { + pIter->hasGroupId = true; + pIter->groupId = pIter->pSrcBlock->info.id.groupId; + pIter->hasPrev = false; + } } else { pIter->finalRow = true; } From 110ac5bd161642a3250f60c234fbcc39aca1da25 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 29 Mar 2024 17:20:28 +0800 Subject: [PATCH 3/5] feat: add test case for pk func partition by tbname --- tests/parallel_test/cases.task | 1 + tests/system-test/2-query/pk_func_group.py | 284 +++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 tests/system-test/2-query/pk_func_group.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index b33f0050d1..d15b12cfe6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -46,6 +46,7 @@ ,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/pk_func_group.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_expr.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/project_group.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py diff --git a/tests/system-test/2-query/pk_func_group.py b/tests/system-test/2-query/pk_func_group.py new file mode 100644 index 0000000000..c6b2e0f846 --- /dev/null +++ b/tests/system-test/2-query/pk_func_group.py @@ -0,0 +1,284 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-] + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use pk_func_group") + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists pk_func_group") + tdSql.execute("create database if not exists pk_func_group") + tdSql.execute('use pk_func_group') + tdSql.execute('drop database IF EXISTS d1;') + + tdSql.execute('drop database IF EXISTS d2;') + + tdSql.execute('create database d1 vgroups 1') + + tdSql.execute('use d1;') + + tdSql.execute('create table st(ts timestamp, pk int primary key, f int) tags(t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1, 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 2, 2);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', 3, 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', 4, 4);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1, 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 4, 4);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 3, 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2, 2);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', 6, 6);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', 5, 5);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', 8, 8);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', 7, 7);") + + tdSql.query('select first(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, 3) + tdSql.checkData(1, 2, 3) + + tdSql.query('select last(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, 6) + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 8) + tdSql.checkData(1, 2, 8) + + tdSql.query('select last_row(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, 6) + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 8) + tdSql.checkData(1, 2, 8) + + tdSql.query('select ts,diff(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + + tdSql.query('select irate(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 4.0) + tdSql.checkData(1, 0, 5.0) + + tdSql.query('select ts,derivative(f, 1s, 0) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0.0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4.0) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1.0) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5.0) + + tdSql.query('select twa(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 2.0) + tdSql.checkData(1, 0, 3.5) + + tdSql.query('select ts,pk,unique(f) from d1.st partition by tbname order by tbname,ts,pk;') + tdSql.checkRows(10) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, 2) + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, 4) + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(4, 1, 6) + tdSql.checkData(4, 2, 6) + tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(5, 1, 3) + tdSql.checkData(5, 2, 3) + tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(6, 1, 4) + tdSql.checkData(6, 2, 4) + tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(7, 1, 2) + tdSql.checkData(7, 2, 2) + tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(8, 1, 7) + tdSql.checkData(8, 2, 7) + tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(9, 1, 8) + tdSql.checkData(9, 2, 8) + + tdSql.execute('create database d2 vgroups 2') + + tdSql.execute('use d2;') + + tdSql.execute('create table st(ts timestamp, pk int primary key, f int) tags(t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1, 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 2, 2);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', 3, 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:00', 4, 4);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1, 1);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 4, 4);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 3, 3);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2, 2);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', 6, 6);") + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:02', 5, 5);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', 8, 8);") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', 7, 7);") + + tdSql.query('select first(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, 3) + tdSql.checkData(1, 2, 3) + + tdSql.query('select last(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, 6) + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 8) + tdSql.checkData(1, 2, 8) + + tdSql.query('select last_row(*) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(0, 1, 6) + tdSql.checkData(0, 2, 6) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 8) + tdSql.checkData(1, 2, 8) + + tdSql.query('select ts,diff(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + + tdSql.query('select irate(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 4.0) + tdSql.checkData(1, 0, 5.0) + + tdSql.query('select ts,derivative(f, 1s, 0) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(4) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(0, 1, 0.0) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(1, 1, 4.0) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, -1.0) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5.0) + + tdSql.query('select twa(f) from d1.st partition by tbname order by tbname;') + tdSql.checkRows(2) + tdSql.checkData(0, 0, 2.0) + tdSql.checkData(1, 0, 3.5) + + tdSql.query('select ts,pk,unique(f) from d1.st partition by tbname order by tbname,ts,pk;') + tdSql.checkRows(10) + tdSql.checkData(0, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(1, 1, 2) + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(2, 1, 4) + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(3, 1, 5) + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(4, 1, 6) + tdSql.checkData(4, 2, 6) + tdSql.checkData(5, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(5, 1, 3) + tdSql.checkData(5, 2, 3) + tdSql.checkData(6, 0, datetime.datetime(2021, 4, 19, 0, 0)) + tdSql.checkData(6, 1, 4) + tdSql.checkData(6, 2, 4) + tdSql.checkData(7, 0, datetime.datetime(2021, 4, 19, 0, 0, 1)) + tdSql.checkData(7, 1, 2) + tdSql.checkData(7, 2, 2) + tdSql.checkData(8, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(8, 1, 7) + tdSql.checkData(8, 2, 7) + tdSql.checkData(9, 0, datetime.datetime(2021, 4, 19, 0, 0, 2)) + tdSql.checkData(9, 1, 8) + tdSql.checkData(9, 2, 8) + + tdSql.execute('drop database pk_func_group') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From 55bb6ab341b524d99ac71d450331e47d3f28a052 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Mar 2024 18:15:03 +0800 Subject: [PATCH 4/5] fix(tsdb): set correct initial value for compare --- include/common/tdataformat.h | 1 + source/common/src/tdataformat.c | 21 ++++++ source/dnode/vnode/src/tq/tq.c | 1 - source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 30 +++++++-- source/dnode/vnode/src/tsdb/tsdbUtil.c | 21 ------ source/libs/executor/src/timesliceoperator.c | 65 +++++++++++++++---- source/libs/executor/src/timewindowoperator.c | 5 +- source/util/test/tbaseCodecTest.cpp | 5 -- 8 files changed, 101 insertions(+), 48 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 73e6837475..6c8a39caf6 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -126,6 +126,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); void tRowGetKey(SRow *pRow, SRowKey *key); int32_t tRowKeyCompare(const void *p1, const void *p2); +int32_t tRowKeyAssign(SRowKey* pDst, SRowKey* pSrc); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index cf8e28bf9a..ceb572edc3 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1286,6 +1286,27 @@ int32_t tRowKeyCompare(const void *p1, const void *p2) { return 0; } +int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { + pDst->ts = pSrc->ts; + pDst->numOfPKs = pSrc->numOfPKs; + + if (pSrc->numOfPKs > 0) { + for (int32_t i = 0; i < pSrc->numOfPKs; ++i) { + SValue *pVal = &pDst->pks[i]; + pVal->type = pSrc->pks[i].type; + + if (IS_NUMERIC_TYPE(pVal->type)) { + pVal->val = pSrc->pks[i].val; + } else { + memcpy(pVal->pData, pVal->pData, pVal->nData); + pVal->nData = pSrc->pks[i].nData; + } + } + } + + return TSDB_CODE_SUCCESS; +} + // STag ======================================== static int tTagValCmprFn(const void *p1, const void *p2) { if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ffebd783ac..141fe88339 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1183,7 +1183,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } } else { -// ASSERT(status == TASK_STATUS__HALT); if (status != TASK_STATUS__HALT) { tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 99f0e42261..86c6b70c92 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -130,14 +130,29 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c return *p; } -static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len) { +static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) { pKey->numOfPKs = numOfPks; pKey->ts = ts; if (numOfPks > 0) { pKey->pks[0].type = type; if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { - pKey->pks[0].val = INT64_MIN; + char* p = (char*)&pKey->pks[0].val; + if (asc) { + switch(pKey->pks[0].type) { + case TSDB_DATA_TYPE_BIGINT:*(int64_t*)p = INT64_MIN;break; + case TSDB_DATA_TYPE_INT:*(int32_t*)p = INT32_MIN;break; + case TSDB_DATA_TYPE_SMALLINT:*(int16_t*)p = INT16_MIN;break; + case TSDB_DATA_TYPE_TINYINT:*(int8_t*)p = INT8_MIN;break; + } + } else { + switch(pKey->pks[0].type) { + case TSDB_DATA_TYPE_BIGINT:*(int64_t*)p = INT64_MAX;break; + case TSDB_DATA_TYPE_INT:*(int32_t*)p = INT32_MAX;break; + case TSDB_DATA_TYPE_SMALLINT:*(int16_t*)p = INT16_MAX;break; + case TSDB_DATA_TYPE_TINYINT:*(int8_t*)p = INT8_MAX;break; + } + } } else { pKey->pks[0].pData = taosMemoryCalloc(1, len); pKey->pks[0].nData = 0; @@ -154,22 +169,23 @@ static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); SRowKey* pRowKey = &pScanInfo->lastProcKey; - if (ASCENDING_TRAVERSE(pReader->info.order)) { + if (asc) { int64_t skey = pReader->info.window.skey; int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey; - initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes); + initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc); initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type, - pReader->suppInfo.pk.bytes); + pReader->suppInfo.pk.bytes, asc); } else { int64_t ekey = pReader->info.window.ekey; int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes); + initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc); initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type, - pReader->suppInfo.pk.bytes); + pReader->suppInfo.pk.bytes, asc); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 59452ebb9d..2b8de9aff3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -628,27 +628,6 @@ void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) { } } -int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { - pDst->ts = pSrc->ts; - pDst->numOfPKs = pSrc->numOfPKs; - - if (pSrc->numOfPKs > 0) { - for (int32_t i = 0; i < pSrc->numOfPKs; ++i) { - SValue *pVal = &pDst->pks[i]; - pVal->type = pSrc->pks[i].type; - - if (IS_NUMERIC_TYPE(pVal->type)) { - pVal->val = pSrc->pks[i].val; - } else { - memcpy(pVal->pData, pVal->pData, pVal->nData); - pVal->nData = pSrc->pks[i].nData; - } - } - } - - return TSDB_CODE_SUCCESS; -} - int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) { int32_t c = tRowKeyCompare(&key1->key, &key2->key); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 4b86f85d53..9b28a203b8 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo { SColumn tsCol; // primary timestamp column SExprSupp scalarSup; // scalar calculation struct SFillColInfo* pFillColInfo; // fill column info - int64_t prevTs; + SRowKey prevKey; bool prevTsSet; uint64_t groupId; SGroupKeys* pPrevGroupKey; @@ -178,22 +178,49 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); } +static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) { + pKey->ts = ts; + pKey->numOfPKs = 1; + + int8_t t = pPkCol->info.type; + + pKey->pks[0].type = t; + if (IS_NUMERIC_TYPE(t)) { + GET_TYPED_DATA(pKey->pks[0].val, int64_t, t, colDataGetNumData(pPkCol, rowIndex)); + } else { + char* p = colDataGetVarData(pPkCol, rowIndex); + pKey->pks[0].pData = (uint8_t*)varDataVal(p); + pKey->pks[0].nData = varDataLen(p); + } +} + static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol, - int32_t curIndex, int32_t rows) { - - + SColumnInfoData* pPkCol, int32_t curIndex, int32_t rows) { int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex); if (currentTs > pSliceInfo->win.ekey) { return false; } - if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) { - return true; + SRowKey cur = {.ts = currentTs, .numOfPKs = (pPkCol != NULL)? 1:0}; + if (pPkCol != NULL) { + cur.pks[0].type = pPkCol->info.type; + } + + if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) { +// if (pPkCol == NULL) { + return true; + /* } else { + tRowGetKeyFromColData(currentTs, pPkCol, curIndex, &cur); + if (tRowKeyCompare(&cur, &pSliceInfo->prevKey) == 0) { + return true; + } + }*/ } pSliceInfo->prevTsSet = true; - pSliceInfo->prevTs = currentTs; + tRowKeyAssign(&pSliceInfo->prevKey, &cur); + // todo handle next if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) { int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); if (currentTs == nextTs) { @@ -695,14 +722,20 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS SInterval* pInterval = &pSliceInfo->interval; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + SColumnInfoData* pPkCol = NULL; + + if (pSliceInfo->hasPk) { + pPkCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->pkCol.slotId); + } int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex; for (; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); // check for duplicate timestamps - if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); + if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) { + continue; +// T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); } if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { @@ -875,11 +908,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STimeSliceOperatorInfo* pSliceInfo = pOperator->info; SSDataBlock* pResBlock = pSliceInfo->pRes; - SOperatorInfo* downstream = pOperator->pDownstream[0]; blockDataCleanup(pResBlock); while (1) { @@ -1017,13 +1048,23 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; pInfo->prevTsSet = false; - pInfo->prevTs = 0; + pInfo->prevKey.ts = INT64_MIN; pInfo->groupId = 0; pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; pInfo->remainIndex = 0; + if (pInfo->hasPk) { + pInfo->prevKey.numOfPKs = 1; + pInfo->prevKey.ts = INT64_MIN; + pInfo->prevKey.pks[0].type = pInfo->pkCol.type; + + if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { + pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes); + } + } + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; pScanInfo->base.cond.twindows = pInfo->win; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 57c038e75a..57cc0e82d2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -658,11 +658,12 @@ static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* */ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) { if (!pOperatorInfo->limited // if no limit info, no filter will be applied - || pOperatorInfo->binfo.inputTsOrder != - pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter + || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder + // if input/output ts order mismatch, no filter ) { return false; } + if (pOperatorInfo->limit == 0) return true; if (pOperatorInfo->pBQ == NULL) { diff --git a/source/util/test/tbaseCodecTest.cpp b/source/util/test/tbaseCodecTest.cpp index 4c56979885..63bbfcaa68 100644 --- a/source/util/test/tbaseCodecTest.cpp +++ b/source/util/test/tbaseCodecTest.cpp @@ -17,11 +17,6 @@ using namespace std; #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} - static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) { int64_t start = taosGetTimestampUs(); char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen); From dd2f6287ec96214176a506df136f821bad0e483b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Mar 2024 18:59:29 +0800 Subject: [PATCH 5/5] fix(tsdb): fix rowkey assign error. --- source/common/src/tdataformat.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index ceb572edc3..991c17c5cc 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1298,7 +1298,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { if (IS_NUMERIC_TYPE(pVal->type)) { pVal->val = pSrc->pks[i].val; } else { - memcpy(pVal->pData, pVal->pData, pVal->nData); + memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); pVal->nData = pSrc->pks[i].nData; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c7164d43de..5469eae1cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2068,8 +2068,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return TSDB_CODE_SUCCESS; } - STsdbRowKey startKey = {0}; - tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey); + STsdbRowKey startKey; + startKey.key = pBlockScanInfo->lastProcKey; startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer; if ((asc && (startKey.key.ts < pWindow->skey)) || ((!asc) && startKey.key.ts > pWindow->ekey)) { startKey.key.ts = asc? pWindow->skey:pWindow->ekey;