Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-27676

This commit is contained in:
wangmm0220 2023-12-07 16:33:10 +08:00
commit 865906eaae
44 changed files with 397 additions and 91 deletions

View File

@ -20,7 +20,7 @@ def get_ts(ts: str):
def create_stable():
conn = taos.connect()
try:
conn.execute("CREATE DATABASE power")
conn.execute("CREATE DATABASE power keep 36500")
conn.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
finally:

View File

@ -4,7 +4,7 @@ import taos
taos_conn = taos.connect()
taos_conn.execute('drop database if exists power')
taos_conn.execute('create database if not exists power wal_retention_period 3600')
taos_conn.execute('create database if not exists power wal_retention_period 3600 keep 36500 ')
taos_conn.execute("use power")
taos_conn.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")

View File

@ -11,7 +11,7 @@ conn = connect(url="http://localhost:6041",
# create STable
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE DATABASE power keep 36500 ")
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")

View File

@ -11,7 +11,7 @@ conn = connect(url="http://localhost:6041",
# create STable
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power", req_id=1)
cursor.execute("CREATE DATABASE power", req_id=2)
cursor.execute("CREATE DATABASE power keep 36500", req_id=2)
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)", req_id=3)

View File

@ -6,13 +6,13 @@ conn = taosws.connect("taosws://root:taosdata@localhost:6041")
# ANCHOR: basic
conn.execute("drop database if exists connwspy")
conn.execute("create database if not exists connwspy wal_retention_period 3600")
conn.execute("create database if not exists connwspy wal_retention_period 3600 keep 36500 ")
conn.execute("use connwspy")
conn.execute("create table if not exists stb (ts timestamp, c1 int) tags (t1 int)")
conn.execute("create table if not exists tb1 using stb tags (1)")
conn.execute("insert into tb1 values (now, 1)")
conn.execute("insert into tb1 values (now, 2)")
conn.execute("insert into tb1 values (now, 3)")
conn.execute("insert into tb1 values (now+1s, 2)")
conn.execute("insert into tb1 values (now+2s, 3)")
r = conn.execute("select * from stb")
result = conn.query("select * from stb")

View File

@ -6,7 +6,7 @@ conn = taosws.connect("taosws://root:taosdata@localhost:6041")
# ANCHOR: basic
conn.execute("drop database if exists connwspy", req_id=1)
conn.execute("create database if not exists connwspy", req_id=2)
conn.execute("create database if not exists connwspy keep 36500", req_id=2)
conn.execute("use connwspy", req_id=3)
conn.execute("create table if not exists stb (ts timestamp, c1 int) tags (t1 int)", req_id=4)
conn.execute("create table if not exists tb1 using stb tags (1)", req_id=5)

View File

@ -4,7 +4,7 @@ import taos
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE DATABASE test keep 36500")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")

View File

@ -4,7 +4,7 @@ import taos
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test", req_id=1)
conn.execute("CREATE DATABASE test", req_id=2)
conn.execute("CREATE DATABASE test keep 36500", req_id=2)
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)", req_id=3)

View File

@ -4,7 +4,7 @@ conn = taos.connect()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS test")
cursor.execute("CREATE DATABASE test")
cursor.execute("CREATE DATABASE test keep 36500")
cursor.execute("USE test")
cursor.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")

View File

@ -4,7 +4,7 @@ conn = taos.connect()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS test", req_id=1)
cursor.execute("CREATE DATABASE test", req_id=2)
cursor.execute("CREATE DATABASE test keep 36500", req_id=2)
cursor.execute("USE test", req_id=3)
cursor.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)", req_id=4)

View File

@ -160,7 +160,7 @@ def main(infinity):
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test keep 36500")
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn.close()

View File

@ -16,7 +16,7 @@ def get_connection():
def create_database(conn):
conn.execute("CREATE DATABASE test")
conn.execute("CREATE DATABASE test keep 36500")
conn.execute("USE test")

View File

@ -5,7 +5,7 @@ LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanD
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
'California.SantaClara', 'California.Cupertino']
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1 wal_retention_period 3600'
CREATE_DATABASE_SQL = 'create database if not exists {} keep 36500 duration 10 buffer 16 wal_level 1 wal_retention_period 3600'
USE_DATABASE_SQL = 'use {}'
DROP_TABLE_SQL = 'drop table if exists meters'
DROP_DATABASE_SQL = 'drop database if exists {}'

View File

@ -15,7 +15,7 @@ def get_connection():
def create_database(conn):
# the default precision is ms (microsecond), but we use us(microsecond) here.
conn.execute("CREATE DATABASE test precision 'us'")
conn.execute("CREATE DATABASE test precision 'us' keep 36500")
conn.execute("USE test")

View File

@ -71,7 +71,7 @@ def insert_data():
def create_stable():
conn = taos.connect()
try:
conn.execute("CREATE DATABASE power")
conn.execute("CREATE DATABASE power keep 36500")
conn.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
finally:

View File

@ -18,7 +18,7 @@ def get_connection() -> taos.TaosConnection:
def create_stable(conn: taos.TaosConnection):
conn.execute("CREATE DATABASE power")
conn.execute("CREATE DATABASE power keep 36500")
conn.execute("USE power")
conn.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")

View File

@ -2,7 +2,7 @@ import taos
conn = taos.connect()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE DATABASE test keep 36500")
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
# prepare data

View File

@ -2,7 +2,7 @@ import taos
conn = taos.connect()
conn.execute("DROP DATABASE IF EXISTS test", req_id=1)
conn.execute("CREATE DATABASE test", req_id=2)
conn.execute("CREATE DATABASE test keep 36500", req_id=2)
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)", req_id=3)
# prepare data

View File

@ -3,7 +3,7 @@ import taos
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)
lines = [

View File

@ -10,9 +10,9 @@ try:
conn.execute("drop database if exists %s" % dbname)
if taos.IS_V3:
conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname)
conn.execute("create database if not exists %s schemaless 1 precision 'ns' keep 36500" % dbname)
else:
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns' keep 36500" % dbname)
conn.select_db(dbname)

View File

@ -10,9 +10,9 @@ try:
conn.execute("drop database if exists %s" % dbname)
if taos.IS_V3:
conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname)
conn.execute("create database if not exists %s schemaless 1 precision 'ns' keep 36500" % dbname)
else:
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns' keep 36500" % dbname)
conn.select_db(dbname)

View File

@ -10,9 +10,9 @@ try:
conn.execute("drop database if exists %s" % dbname)
if taos.IS_V3:
conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname)
conn.execute("create database if not exists %s schemaless 1 precision 'ns' keep 36500" % dbname)
else:
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns' keep 36500" % dbname)
conn.select_db(dbname)

View File

@ -4,7 +4,7 @@ from taos import SmlProtocol, SmlPrecision
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)
lines = [

View File

@ -4,7 +4,7 @@ from taos import SmlProtocol, SmlPrecision
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.execute("create database if not exists %s precision 'us' keep 36500" % dbname)
conn.select_db(dbname)
lines = [

View File

@ -10,7 +10,7 @@ class SQLWriter:
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("create database if not exists test")
self._conn.execute("create database if not exists test keep 36500")
self._conn.execute("USE test")
def get_max_sql_length(self):

View File

@ -10,7 +10,7 @@ db_name = 'test_ws_stmt'
def before():
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.execute("create database %s keep 36500" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(

View File

@ -9,7 +9,7 @@ import taos
def before_test(db_name):
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.execute("create database %s keep 36500" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(

View File

@ -19,7 +19,7 @@ def get_connection():
def create_database(conn):
conn.execute("CREATE DATABASE test")
conn.execute("CREATE DATABASE test keep 36500")
conn.execute("USE test")

View File

@ -7,7 +7,7 @@ def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600 keep 36500")
conn.select_db("tmq_assignment_demo_db")
conn.execute(
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")

View File

@ -6,7 +6,7 @@ def init_tmq_env(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} wal_retention_period 3600".format(db))
conn.execute("create database if not exists {} wal_retention_period 3600 keep 36500".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")

View File

@ -6,7 +6,7 @@ def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600 keep 36500")
conn.select_db("tmq_assignment_demo_db")
conn.execute(
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")

View File

@ -295,7 +295,36 @@ typedef enum ENodeType {
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
// placeholder for [152, 180]
QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
// show statement nodes
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
QUERY_NODE_SHOW_DNODES_STMT = 400,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
@ -324,31 +353,6 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_VNODES_STMT,
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT,
QUERY_NODE_SHOW_VIEWS_STMT,
QUERY_NODE_SHOW_CREATE_VIEW_STMT,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,

View File

@ -732,7 +732,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return -1;
}
newstr[0] = '"';
strncpy(newstr+1, str, len);
memcpy(newstr+1, str, len);
newstr[len + 1] = '"';
newstr[len + 2] = '\0';
str = newstr;

View File

@ -168,6 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {

View File

@ -1797,8 +1797,8 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}
if (record.version <= pReader->info.verRange.maxVer) {
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
/*tsdbError("tomb xx load/cache: vgId:%d fid:%d record %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);

View File

@ -22,17 +22,32 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
char *buf = taosMemoryCalloc(1, pCol->info.bytes);
SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE);
pRes->bytes = 0;
pRes->hasResult = true;
pRes->isNull = true;
varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pCol, row, buf, false);
taosMemoryFree(buf);
}
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = 0;
SFirstLastRes* p;
col_id_t colId;
uint64_t ts = TSKEY_MIN;
SFirstLastRes* p = NULL;
col_id_t colId = -1;
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
if (slotIds[i] == -1) {
setFirstLastResColToNull(pColInfoData, numOfRows);
continue;
}
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
colId = pColVal->colVal.cid;
@ -63,10 +78,14 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
if (ts == TSKEY_MIN) {
colDataSetNULL(pCol, numOfRows);
} else {
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
}
continue;
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) {
if (!p->isNull) {
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) {
if (p && !p->isNull) {
colDataSetVal(pCol, numOfRows, p->buf, false);
} else {
colDataSetNULL(pCol, numOfRows);
@ -81,6 +100,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t slotId = slotIds[i];
if (slotId == -1) {
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal;
@ -300,7 +323,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
int32_t bytes;
if (slotIds[j] == -1)
bytes = 1;
else
bytes = pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
@ -324,6 +353,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->numOfCols; ++i) {
int32_t slotId = slotIds[i];
if (slotId == -1) {
SLastCol p = {.ts = INT64_MIN, .colVal.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
taosArrayPush(pLastCols, &p);
continue;
}
struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
@ -348,6 +382,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
bool hasNotNullRow = true;
int64_t singleTableLastTs = INT64_MAX;
for (int32_t k = 0; k < pr->numOfCols; ++k) {
if (slotIds[k] == -1) continue;
SLastCol* p = taosArrayGet(pLastCols, k);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);

View File

@ -352,6 +352,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
bool found = false;
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->dstSlotId] = -1;
@ -361,9 +362,14 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[i] = j;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
found = true;
break;
}
}
if (!found) {
(*pSlotIds)[i] = -1;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
}
}
return TSDB_CODE_SUCCESS;

View File

@ -189,7 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
int64_t lastTs = TSKEY_MIN;
bool ignoreRow = false;
bool updateLastRow = false;
bool disorderTs = false;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
@ -249,7 +249,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
} else {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
if (*(int64_t*)var == lastTs) {
ignoreRow = true;
updateLastRow = true;
} else if (*(int64_t*)var < lastTs) {
disorderTs = true;
} else {
@ -269,15 +269,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
break;
}
if (ignoreRow) {
break;
}
}
if (ignoreRow) {
ignoreRow = false;
continue;
}
SRow* pRow = NULL;
@ -285,7 +276,14 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
if (updateLastRow) {
updateLastRow = false;
SRow** lastRow = taosArrayPop(tbData.aRowP);
tRowDestroy(*lastRow);
taosArrayPush(tbData.aRowP, &pRow);
} else {
taosArrayPush(tbData.aRowP, &pRow);
}
}
if (disorderTs) {

View File

@ -4622,11 +4622,15 @@ static int32_t addOrderByPrimaryKeyToQueryImpl(STranslateContext* pCxt, SNode* p
return TSDB_CODE_OUT_OF_MEMORY;
}
((SExprNode*)pOrderByExpr->pExpr)->orderAlias = true;
NODES_DESTORY_LIST(*pOrderByList);
// NODES_DESTORY_LIST(*pOrderByList);
return nodesListMakeStrictAppend(pOrderByList, (SNode*)pOrderByExpr);
}
static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) {
SNodeList* pOrederList = ((SSelectStmt*)pStmt)->pOrderByList;
if (pOrederList && pOrederList->length > 0) {
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList);
}
@ -7388,6 +7392,13 @@ static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList)
&& pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Event window for stream on super table must patitioned by table name");
}
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) {

View File

@ -1033,6 +1033,7 @@ static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
strcpy(pCol->colName, pExpr->aliasName);
}
strcpy(pCol->node.aliasName, pExpr->aliasName);
strcpy(pCol->node.userAlias, pExpr->userAlias);
pCol->node.resType = pExpr->resType;
return (SNode*)pCol;
}
@ -1062,8 +1063,9 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
SNode* pTarget = NULL;
bool found = false;
FOREACH(pTarget, pTargets) {
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
(0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget))
// || (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))
) {
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
if (TSDB_CODE_SUCCESS != code) {
break;

View File

@ -86,7 +86,7 @@ pip3 install kafka-python
python3 kafka_example_consumer.py
# 21
pip3 install taos-ws-py==0.2.6
pip3 install taos-ws-py==0.3.1
python3 conn_websocket_pandas.py
# 22

View File

@ -1132,6 +1132,9 @@ e
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/event0.sim
,,y,script,./test.sh -f tsim/stream/event1.sim
,,y,script,./test.sh -f tsim/stream/event2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim

View File

@ -70,7 +70,116 @@ class TDTestCase:
tdSql.error('''insert into %s.tb1 (c8, c9) values(now, 1);'''%(database))
def use_select_sort(self,database):
ts = 1604298064000
tdSql.execute('''drop database if exists %s ;''' %database)
tdSql.execute('''create database %s keep 36500 ;'''%(database))
tdSql.execute('''use %s;'''%database)
tdSql.execute('''create stable %s.st (ts timestamp, val int, vt timestamp) tags (location NCHAR(100));'''%(database))
tdSql.execute('''create table %s.t1 using %s.st (location) tags ("0001");'''%(database,database))
tdSql.execute('''create table %s.t2 using %s.st (location) tags ("0002");'''%(database,database))
tdSql.execute('''create table %s.mt (ts timestamp, val int);'''%(database))
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) ({ts}, 2, {ts});'''%(database))
tdSql.query("select ts, val from %s.t1;"%database)
tdSql.checkData(0,1,2)
ts += 1
tdSql.execute(f'''insert into %s.t2 values({ts}, 1, {ts}) ({ts}, 5, {ts}) ({ts}, 2, {ts});'''%(database))
tdSql.query("select ts, val from %s.t2;"%database)
tdSql.checkData(0,1,2)
tdSql.execute('''delete from %s.t2;'''%(database))
tdSql.execute('''delete from %s.t1;'''%(database))
ts -= 10
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
ts += 11
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
ts += 1
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
tdSql.query("select count(*) from %s.st;"%database)
tdSql.checkData(0,0,6)
tdSql.query('''select vt, val from %s.st order by vt, val desc;'''%(database))
tdSql.checkData(0,1,2)
tdSql.checkData(1,1,1)
tdSql.checkData(2,1,2)
tdSql.checkData(3,1,1)
tdSql.checkData(4,1,2)
tdSql.checkData(5,1,1)
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by vt, val desc;'''%(database,database))
tdSql.query("select count(*) from %s.mt;"%database)
tdSql.checkData(0,0,3)
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
tdSql.checkData(0,1,1)
tdSql.checkData(1,1,1)
tdSql.checkData(2,1,1)
tdSql.execute('''delete from %s.mt;'''%(database))
tdSql.query('''select vt, val from %s.st order by vt, val asc;'''%(database))
tdSql.checkData(0,1,1)
tdSql.checkData(1,1,2)
tdSql.checkData(2,1,1)
tdSql.checkData(3,1,2)
tdSql.checkData(4,1,1)
tdSql.checkData(5,1,2)
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by vt, val asc;'''%(database,database))
tdSql.query("select count(*) from %s.mt;"%database)
tdSql.checkData(0,0,3)
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
tdSql.checkData(0,1,2)
tdSql.checkData(1,1,2)
tdSql.checkData(2,1,2)
tdSql.execute('''delete from %s.mt;'''%(database))
tdSql.query('''select vt, val from %s.st order by ts, val asc;'''%(database))
tdSql.checkData(0,1,1)
tdSql.checkData(1,1,2)
tdSql.checkData(2,1,1)
tdSql.checkData(3,1,2)
tdSql.checkData(4,1,1)
tdSql.checkData(5,1,2)
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by ts, val asc;'''%(database,database))
tdSql.query("select count(*) from %s.mt;"%database)
tdSql.checkData(0,0,3)
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
tdSql.checkData(0,1,2)
tdSql.checkData(1,1,2)
tdSql.checkData(2,1,2)
tdSql.execute('''delete from %s.mt;'''%(database))
ts += 1
tdSql.execute(f'''insert into %s.t1 values({ts}, -1, {ts}) %s.t2 values({ts}, -2, {ts});'''%(database,database))
tdSql.query('''select vt, val from %s.st order by val asc;'''%(database))
tdSql.checkData(0,1,-2)
tdSql.checkData(1,1,-1)
tdSql.checkData(2,1,1)
tdSql.checkData(3,1,1)
tdSql.checkData(4,1,1)
tdSql.checkData(5,1,2)
tdSql.checkData(6,1,2)
tdSql.checkData(7,1,2)
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by val asc;'''%(database,database))
tdSql.query("select count(*) from %s.mt;"%database)
tdSql.checkData(0,0,4)
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
tdSql.checkData(0,1,2)
tdSql.checkData(1,1,2)
tdSql.checkData(2,1,2)
tdSql.checkData(3,1,-1)
def run(self):
@ -88,6 +197,8 @@ class TDTestCase:
self.users_bug_TD_20592("%s" %self.db)
self.use_select_sort("%s" %self.db)
#taos -f sql
print("taos -f sql start!")
taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename)

View File

@ -1,3 +1,4 @@
from sqlite3 import ProgrammingError
import taos
import sys
import time
@ -319,10 +320,144 @@ class TDTestCase:
tdSql.checkData(0, 0, '2018-11-25 19:30:00.000')
tdSql.checkData(0, 1, '2018-11-25 19:30:01.000')
def test_cache_scan_with_drop_and_add_column(self):
tdSql.query("select last(c10) from meters")
tdSql.checkData(0, 0, '2018-11-25 19:30:01')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query("select last(c10) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
tdSql.checkData(0, 10, None)
tdSql.query('select last(c10), c10, ts from meters', queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
def test_cache_scan_with_drop_and_add_column2(self):
tdSql.query("select last(c1) from meters")
tdSql.checkData(0, 0, '999')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"])
p.check_returncode()
tdSql.query("select last(c1) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkData(0, 1, None)
tdSql.query('select last(c1), c1, ts from meters', queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
try:
tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, None)
try:
tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, None)
def test_cache_scan_with_drop_column(self):
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c9"])
p.check_returncode()
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 9, None)
def test_cache_scan_last_row_with_drop_column(self):
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 10, None)
def test_cache_scan_last_row_with_drop_column2(self):
tdSql.query('select last_row(c1) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkData(0, 0, None)
def test_cache_scan_last_row_with_partition_by(self):
tdSql.query('select last(c1) from meters partition by t1')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def test_cache_scan_last_row_with_partition_by_tbname(self):
tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def run(self):
self.prepareTestEnv()
#time.sleep(99999999)
self.test_last_cache_scan()
#self.test_cache_scan_with_drop_and_add_column()
self.test_cache_scan_with_drop_and_add_column2()
#self.test_cache_scan_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column2()
#self.test_cache_scan_last_row_with_partition_by()
#self.test_cache_scan_last_row_with_partition_by_tbname()
def stop(self):
tdSql.close()