diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f4da82e850..4d6fb11747 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -666,20 +666,19 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* int32_t targetIdx = 0; int32_t sourceIdx = 0; while (targetIdx < colActual) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + if (sourceIdx >= numOfCols) { - tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); - return -1; + tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); + colDataSetNNULL(pColData, 0, numOfRows); + targetIdx++; + continue; } SColData* pCol = taosArrayGet(pCols, sourceIdx); - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); SColVal colVal; - if (pCol->nVal != numOfRows) { - tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - return -1; - } - + tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId); if (pCol->cid < pColData->info.colId) { sourceIdx++; } else if (pCol->cid == pColData->info.colId) { @@ -693,7 +692,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* sourceIdx++; targetIdx++; } else { - colDataSetNNULL(pColData, 0, pCol->nVal); + colDataSetNNULL(pColData, 0, numOfRows); targetIdx++; } } @@ -712,9 +711,6 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* SColVal colVal; tRowGet(pRow, pTSchema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { - // tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in - // schema:%d", - // sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols); sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1cd6a0d9eb..a6613e589a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2084,6 +2084,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; setBlockIntoRes(pInfo, pRes, &defaultWindow, true); + qDebug("doQueueScan after filter get data from log %" PRId64 " rows, version:%" PRId64, pInfo->pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index a2348bdedd..27127e593e 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -229,6 +229,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 0330454b73..94e9babf3c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -16,6 +16,7 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmq_ts4563.py b/tests/system-test/7-tmq/tmq_ts4563.py new file mode 100644 index 0000000000..fc1cc259ce --- /dev/null +++ b/tests/system-test/7-tmq/tmq_ts4563.py @@ -0,0 +1,149 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +from taos import * + +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def consumeTest_TS_4563(self): + tdSql.execute(f'use db_stmt') + + tdSql.query("select ts,k from st") + tdSql.checkRows(2) + + tdSql.execute(f'create topic t_unorder_data as select ts,k from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["t_unorder_data"]) + except TmqError: + tdLog.exit(f"subscribe error") + + cnt = 0 + try: + while True: + res = consumer.poll(1) + print(res) + if not res: + if cnt == 0: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + for block in val: + cnt += len(block.fetchall()) + + if cnt != 2: + tdLog.exit("consume error") + + finally: + consumer.close() + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def newcon(self,host,cfg): + user = "root" + password = "taosdata" + port =6030 + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + print(con) + return con + + def test_stmt_insert_multi(self,conn): + # type: (TaosConnection) -> None + + dbname = "db_stmt" + try: + conn.execute("drop database if exists %s" % dbname) + conn.execute("create database if not exists %s" % dbname) + conn.select_db(dbname) + + conn.execute( + "create table st(ts timestamp, i int, j int, k int)", + ) + # conn.load_table_info("log") + tdLog.debug("statement start") + start = datetime.now() + stmt = conn.statement("insert into st(ts,j) values(?, ?)") + + params = new_multi_binds(2) + params[0].timestamp((1626861392589, 1626861392590)) + params[1].int([3, None]) + + # print(type(stmt)) + tdLog.debug("bind_param_batch start") + stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") + stmt.execute() + tdLog.debug("execute end") + end = datetime.now() + print("elapsed time: ", end - start) + assert stmt.affected_rows == 2 + tdLog.debug("close start") + + stmt.close() + + # conn.execute("drop database if exists %s" % dbname) + conn.close() + + except Exception as err: + # conn.execute("drop database if exists %s" % dbname) + conn.close() + raise err + + def run(self): + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + host="localhost" + connectstmt=self.newcon(host,config) + self.test_stmt_insert_multi(connectstmt) + self.consumeTest_TS_4563() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())