Keep the last data with the same timestamp when inserting
This commit is contained in:
parent
7bb06cb1cc
commit
a0e356bf84
|
@ -189,7 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t lastTs = TSKEY_MIN;
|
int64_t lastTs = TSKEY_MIN;
|
||||||
bool ignoreRow = false;
|
bool updateLastRow = false;
|
||||||
bool disorderTs = false;
|
bool disorderTs = false;
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||||
|
@ -249,7 +249,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
} else {
|
} else {
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
||||||
if (*(int64_t*)var == lastTs) {
|
if (*(int64_t*)var == lastTs) {
|
||||||
ignoreRow = true;
|
updateLastRow = true;
|
||||||
} else if (*(int64_t*)var < lastTs) {
|
} else if (*(int64_t*)var < lastTs) {
|
||||||
disorderTs = true;
|
disorderTs = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -269,15 +269,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ignoreRow) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ignoreRow) {
|
|
||||||
ignoreRow = false;
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRow* pRow = NULL;
|
SRow* pRow = NULL;
|
||||||
|
@ -285,7 +276,14 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
if (updateLastRow) {
|
||||||
|
updateLastRow = false;
|
||||||
|
SRow** lastRow = taosArrayPop(tbData.aRowP);
|
||||||
|
tRowDestroy(*lastRow);
|
||||||
taosArrayPush(tbData.aRowP, &pRow);
|
taosArrayPush(tbData.aRowP, &pRow);
|
||||||
|
} else {
|
||||||
|
taosArrayPush(tbData.aRowP, &pRow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (disorderTs) {
|
if (disorderTs) {
|
||||||
|
|
|
@ -4627,15 +4627,10 @@ static int32_t addOrderByPrimaryKeyToQueryImpl(STranslateContext* pCxt, SNode* p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) {
|
static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) {
|
||||||
SNode* pOrderNode = NULL;
|
|
||||||
SNodeList* pOrederList = ((SSelectStmt*)pStmt)->pOrderByList;
|
SNodeList* pOrederList = ((SSelectStmt*)pStmt)->pOrderByList;
|
||||||
if (pOrederList != NULL) {
|
if (pOrederList && pOrederList->length > 0) {
|
||||||
FOREACH(pOrderNode, pOrederList) {
|
|
||||||
if (((SColumnNode*)pPrimaryKeyExpr)->colId == ((SColumnNode*)((SOrderByExprNode*)pOrderNode)->pExpr)->colId) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||||
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList);
|
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList);
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,116 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.error('''insert into %s.tb1 (c8, c9) values(now, 1);'''%(database))
|
tdSql.error('''insert into %s.tb1 (c8, c9) values(now, 1);'''%(database))
|
||||||
|
|
||||||
|
def use_select_sort(self,database):
|
||||||
|
ts = 1604298064000
|
||||||
|
|
||||||
|
tdSql.execute('''drop database if exists %s ;''' %database)
|
||||||
|
tdSql.execute('''create database %s keep 36500 ;'''%(database))
|
||||||
|
tdSql.execute('''use %s;'''%database)
|
||||||
|
|
||||||
|
tdSql.execute('''create stable %s.st (ts timestamp, val int, vt timestamp) tags (location NCHAR(100));'''%(database))
|
||||||
|
tdSql.execute('''create table %s.t1 using %s.st (location) tags ("0001");'''%(database,database))
|
||||||
|
tdSql.execute('''create table %s.t2 using %s.st (location) tags ("0002");'''%(database,database))
|
||||||
|
tdSql.execute('''create table %s.mt (ts timestamp, val int);'''%(database))
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) ({ts}, 2, {ts});'''%(database))
|
||||||
|
tdSql.query("select ts, val from %s.t1;"%database)
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
|
||||||
|
ts += 1
|
||||||
|
tdSql.execute(f'''insert into %s.t2 values({ts}, 1, {ts}) ({ts}, 5, {ts}) ({ts}, 2, {ts});'''%(database))
|
||||||
|
tdSql.query("select ts, val from %s.t2;"%database)
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''delete from %s.t2;'''%(database))
|
||||||
|
tdSql.execute('''delete from %s.t1;'''%(database))
|
||||||
|
|
||||||
|
ts -= 10
|
||||||
|
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
|
||||||
|
ts += 11
|
||||||
|
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
|
||||||
|
ts += 1
|
||||||
|
tdSql.execute(f'''insert into %s.t1 values({ts}, 1, {ts}) %s.t2 values({ts}, 2, {ts});'''%(database,database))
|
||||||
|
|
||||||
|
tdSql.query("select count(*) from %s.st;"%database)
|
||||||
|
tdSql.checkData(0,0,6)
|
||||||
|
|
||||||
|
tdSql.query('''select vt, val from %s.st order by vt, val desc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
tdSql.checkData(1,1,1)
|
||||||
|
tdSql.checkData(2,1,2)
|
||||||
|
tdSql.checkData(3,1,1)
|
||||||
|
tdSql.checkData(4,1,2)
|
||||||
|
tdSql.checkData(5,1,1)
|
||||||
|
|
||||||
|
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by vt, val desc;'''%(database,database))
|
||||||
|
tdSql.query("select count(*) from %s.mt;"%database)
|
||||||
|
tdSql.checkData(0,0,3)
|
||||||
|
|
||||||
|
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,1)
|
||||||
|
tdSql.checkData(1,1,1)
|
||||||
|
tdSql.checkData(2,1,1)
|
||||||
|
|
||||||
|
tdSql.execute('''delete from %s.mt;'''%(database))
|
||||||
|
tdSql.query('''select vt, val from %s.st order by vt, val asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,1)
|
||||||
|
tdSql.checkData(1,1,2)
|
||||||
|
tdSql.checkData(2,1,1)
|
||||||
|
tdSql.checkData(3,1,2)
|
||||||
|
tdSql.checkData(4,1,1)
|
||||||
|
tdSql.checkData(5,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by vt, val asc;'''%(database,database))
|
||||||
|
tdSql.query("select count(*) from %s.mt;"%database)
|
||||||
|
tdSql.checkData(0,0,3)
|
||||||
|
|
||||||
|
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
tdSql.checkData(1,1,2)
|
||||||
|
tdSql.checkData(2,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''delete from %s.mt;'''%(database))
|
||||||
|
tdSql.query('''select vt, val from %s.st order by ts, val asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,1)
|
||||||
|
tdSql.checkData(1,1,2)
|
||||||
|
tdSql.checkData(2,1,1)
|
||||||
|
tdSql.checkData(3,1,2)
|
||||||
|
tdSql.checkData(4,1,1)
|
||||||
|
tdSql.checkData(5,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by ts, val asc;'''%(database,database))
|
||||||
|
tdSql.query("select count(*) from %s.mt;"%database)
|
||||||
|
tdSql.checkData(0,0,3)
|
||||||
|
|
||||||
|
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
tdSql.checkData(1,1,2)
|
||||||
|
tdSql.checkData(2,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''delete from %s.mt;'''%(database))
|
||||||
|
ts += 1
|
||||||
|
tdSql.execute(f'''insert into %s.t1 values({ts}, -1, {ts}) %s.t2 values({ts}, -2, {ts});'''%(database,database))
|
||||||
|
tdSql.query('''select vt, val from %s.st order by val asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,-2)
|
||||||
|
tdSql.checkData(1,1,-1)
|
||||||
|
tdSql.checkData(2,1,1)
|
||||||
|
tdSql.checkData(3,1,1)
|
||||||
|
tdSql.checkData(4,1,1)
|
||||||
|
tdSql.checkData(5,1,2)
|
||||||
|
tdSql.checkData(6,1,2)
|
||||||
|
tdSql.checkData(7,1,2)
|
||||||
|
|
||||||
|
tdSql.execute('''insert into %s.mt select vt, val from %s.st order by val asc;'''%(database,database))
|
||||||
|
tdSql.query("select count(*) from %s.mt;"%database)
|
||||||
|
tdSql.checkData(0,0,4)
|
||||||
|
|
||||||
|
tdSql.query('''select ts, val from %s.mt order by ts asc;'''%(database))
|
||||||
|
tdSql.checkData(0,1,2)
|
||||||
|
tdSql.checkData(1,1,2)
|
||||||
|
tdSql.checkData(2,1,2)
|
||||||
|
tdSql.checkData(3,1,-1)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
|
@ -88,6 +197,8 @@ class TDTestCase:
|
||||||
|
|
||||||
self.users_bug_TD_20592("%s" %self.db)
|
self.users_bug_TD_20592("%s" %self.db)
|
||||||
|
|
||||||
|
self.use_select_sort("%s" %self.db)
|
||||||
|
|
||||||
#taos -f sql
|
#taos -f sql
|
||||||
print("taos -f sql start!")
|
print("taos -f sql start!")
|
||||||
taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename)
|
taos_cmd1 = "taos -f %s/%s.sql" % (self.testcasePath,self.testcaseFilename)
|
||||||
|
|
Loading…
Reference in New Issue