diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 8e517eb5f2..fe303bc314 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -364,6 +364,9 @@ static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClea pStmt->sql.siInfo.pTableColsIdx = 0; stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue); } + if (NULL != pStmt->exec.pRequest) { + pStmt->exec.pRequest->body.resInfo.numOfRows = 0; + } } else { if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) { // if (!pStmt->options.asyncExecFn) { diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 0f721d6a6b..3a648042a6 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -47,8 +47,14 @@ void checkError(TAOS_STMT2* stmt, int code) { } } +typedef struct AsyncArgs { + int async_affected_rows; + tsem_t sem; +} AsyncArgs; + void stmtAsyncQueryCb(void* param, TAOS_RES* pRes, int code) { - int affected_rows = taos_affected_rows(pRes); + ((AsyncArgs*)param)->async_affected_rows = taos_affected_rows(pRes); + ASSERT_EQ(tsem_post(&((AsyncArgs*)param)->sem), TSDB_CODE_SUCCESS); return; } @@ -199,7 +205,14 @@ void do_stmt(TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUM // exec int affected = 0; code = taos_stmt2_exec(stmt, &affected); - total_affected += affected; + if (option->asyncExecFn == NULL) { + total_affected += affected; + } else { + AsyncArgs* params = (AsyncArgs*)option->userdata; + code = tsem_wait(¶ms->sem); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + total_affected += params->async_affected_rows; + } checkError(stmt, code); for (int i = 0; i < CTB_NUMS; i++) { @@ -219,9 +232,7 @@ void do_stmt(TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUM taosMemoryFree(tags); } } - if (option->asyncExecFn == NULL) { - ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); - } + ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); for (int i = 0; i < CTB_NUMS; i++) { taosMemoryFree(tbs[i]); } @@ -908,7 +919,11 @@ TEST(stmt2Case, stmt2_stb_insert) { } // async - option = {0, true, true, stmtAsyncQueryCb, NULL}; + AsyncArgs* aa = (AsyncArgs*)taosMemMalloc(sizeof(AsyncArgs)); + aa->async_affected_rows = 0; + ASSERT_EQ(tsem_init(&aa->sem, 0, 0), TSDB_CODE_SUCCESS); + void* param = aa; + option = {0, true, true, stmtAsyncQueryCb, param}; { do_stmt(taos, &option, "insert into stmt2_testdb_1.stb (ts,b,tbname,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true, true); } @@ -926,12 +941,14 @@ TEST(stmt2Case, stmt2_stb_insert) { { do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); } // interlace = 1 - option = {0, true, true, stmtAsyncQueryCb, NULL}; + option = {0, true, true, stmtAsyncQueryCb, param}; { do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); } option = {0, true, true, NULL, NULL}; { do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); } do_query(taos, "drop database if exists stmt2_testdb_1"); + (void)tsem_destroy(&aa->sem); + taosMemFree(aa); taos_close(taos); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 25fa8a8660..f20e2bc0a5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -524,6 +524,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/retention_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/retention_test2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel_createdb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ttl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ttlChangeOnWrite.py diff --git a/tests/system-test/0-others/retention_test2.py b/tests/system-test/0-others/retention_test2.py new file mode 100644 index 0000000000..94c92bd154 --- /dev/null +++ b/tests/system-test/0-others/retention_test2.py @@ -0,0 +1,100 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +import os +import time +from util.log import * + +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +import subprocess +from datetime import datetime, timedelta + + +class TDTestCase: + def _prepare_env1(self): + tdLog.info("============== prepare environment 1 ===============") + + level0_paths = [ + f'{self.dnode_path}/data00', + f'{self.dnode_path}/data01', + # f'{self.dnode_path}/data02', + ] + + cfg = { + f"{level0_paths[0]} 0 1": 'dataDir', + f"{level0_paths[1]} 0 0": 'dataDir', + # f"{level0_paths[2]} 0 0": 'dataDir', + } + + for path in level0_paths: + tdSql.createDir(path) + tdDnodes.stop(1) + tdDnodes.deploy(1, cfg) + tdDnodes.start(1) + + def _create_db_write_and_flush(self, dbname): + tdSql.execute(f'create database {dbname} vgroups 1 stt_trigger 1') + tdSql.execute(f'use {dbname}') + tdSql.execute(f'create table t1 (ts timestamp, a int) ') + tdSql.execute(f'create table t2 (ts timestamp, a int) ') + + now = int(datetime.now().timestamp() * 1000) + + for i in range(1000): + tdSql.execute(f'insert into t1 values ({now + i}, {i})') + + tdSql.execute(f'flush database {dbname}') + + for i in range(1): + tdSql.execute(f'insert into t2 values ({now + i}, {i})') + + tdSql.execute(f'flush database {dbname}') + + def run(self): + self._prepare_env1() + + for dbname in [f'db{i}' for i in range(0, 20)]: + self._create_db_write_and_flush(dbname) + + cmd = f"find {self.dnode_path}/data00 -name 'v*.data' | wc -l" + num_files1 = int(subprocess.check_output(cmd, shell=True).strip()) + + cmd = f"find {self.dnode_path}/data01 -name 'v*.data' | wc -l" + num_files2 = int(subprocess.check_output(cmd, shell=True).strip()) + tdSql.checkEqual(num_files1, num_files2) + + tdDnodes.stop(1) + + time.sleep(10) + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.dnode_path = tdCom.getTaosdPath() + self.cfg_path = f'{self.dnode_path}/cfg' + self.log_path = f'{self.dnode_path}/log' + self.db_name = 'test' + self.vgroups = 10 + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index b3e7e5b4d3..408d9e71c5 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -247,6 +247,7 @@ python3 ./test.py -f 0-others/user_privilege_all.py python3 ./test.py -f 0-others/fsync.py python3 ./test.py -f 0-others/multilevel.py python3 ./test.py -f 0-others/retention_test.py +python3 ./test.py -f 0-others/retention_test2.py python3 ./test.py -f 0-others/ttl.py python3 ./test.py -f 0-others/ttlChangeOnWrite.py python3 ./test.py -f 0-others/compress_tsz1.py