Merge remote-tracking branch 'origin/3.0' into feat/TD-27337

This commit is contained in:
dapan1121 2023-12-06 19:22:52 +08:00
commit 840690b59b
7 changed files with 226 additions and 35 deletions

View File

@ -296,7 +296,36 @@ typedef enum ENodeType {
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
// placeholder for [152, 180]
QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
// show statement nodes
// see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET'
QUERY_NODE_SHOW_DNODES_STMT = 400,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
QUERY_NODE_SHOW_QNODES_STMT,
@ -325,31 +354,6 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_VNODES_STMT,
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT,
QUERY_NODE_SHOW_VIEWS_STMT,
QUERY_NODE_SHOW_CREATE_VIEW_STMT,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_TABLE_TAGS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_RESTORE_DNODE_STMT,
QUERY_NODE_RESTORE_QNODE_STMT,
QUERY_NODE_RESTORE_MNODE_STMT,
QUERY_NODE_RESTORE_VNODE_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
QUERY_NODE_CREATE_VIEW_STMT,
QUERY_NODE_DROP_VIEW_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,

View File

@ -168,6 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {

View File

@ -22,17 +22,32 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
char *buf = taosMemoryCalloc(1, pCol->info.bytes);
SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE);
pRes->bytes = 0;
pRes->hasResult = true;
pRes->isNull = true;
varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pCol, row, buf, false);
taosMemoryFree(buf);
}
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = 0;
SFirstLastRes* p;
col_id_t colId;
uint64_t ts = TSKEY_MIN;
SFirstLastRes* p = NULL;
col_id_t colId = -1;
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
if (slotIds[i] == -1) {
setFirstLastResColToNull(pColInfoData, numOfRows);
continue;
}
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
colId = pColVal->colVal.cid;
@ -63,10 +78,14 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
if (ts == TSKEY_MIN) {
colDataSetNULL(pCol, numOfRows);
} else {
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
}
continue;
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) {
if (!p->isNull) {
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) {
if (p && !p->isNull) {
colDataSetVal(pCol, numOfRows, p->buf, false);
} else {
colDataSetNULL(pCol, numOfRows);
@ -81,6 +100,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t slotId = slotIds[i];
if (slotId == -1) {
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal;
@ -300,7 +323,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
int32_t bytes;
if (slotIds[j] == -1)
bytes = 1;
else
bytes = pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
@ -324,6 +353,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->numOfCols; ++i) {
int32_t slotId = slotIds[i];
if (slotId == -1) {
SLastCol p = {.ts = INT64_MIN, .colVal.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
taosArrayPush(pLastCols, &p);
continue;
}
struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
@ -348,6 +382,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
bool hasNotNullRow = true;
int64_t singleTableLastTs = INT64_MAX;
for (int32_t k = 0; k < pr->numOfCols; ++k) {
if (slotIds[k] == -1) continue;
SLastCol* p = taosArrayGet(pLastCols, k);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);

View File

@ -352,6 +352,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
bool found = false;
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->dstSlotId] = -1;
@ -361,9 +362,14 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[i] = j;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
found = true;
break;
}
}
if (!found) {
(*pSlotIds)[i] = -1;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
}
}
return TSDB_CODE_SUCCESS;

View File

@ -7442,6 +7442,13 @@ static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if ( (SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta
&& TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType
&& !hasPartitionByTbname(pSelect->pPartitionByList)
&& pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Event window for stream on super table must patitioned by table name");
}
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) {

View File

@ -1132,6 +1132,9 @@ e
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/event0.sim
,,y,script,./test.sh -f tsim/stream/event1.sim
,,y,script,./test.sh -f tsim/stream/event2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim

View File

@ -1,3 +1,4 @@
from sqlite3 import ProgrammingError
import taos
import sys
import time
@ -319,10 +320,144 @@ class TDTestCase:
tdSql.checkData(0, 0, '2018-11-25 19:30:00.000')
tdSql.checkData(0, 1, '2018-11-25 19:30:01.000')
def test_cache_scan_with_drop_and_add_column(self):
tdSql.query("select last(c10) from meters")
tdSql.checkData(0, 0, '2018-11-25 19:30:01')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query("select last(c10) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
tdSql.checkData(0, 10, None)
tdSql.query('select last(c10), c10, ts from meters', queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
def test_cache_scan_with_drop_and_add_column2(self):
tdSql.query("select last(c1) from meters")
tdSql.checkData(0, 0, '999')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"])
p.check_returncode()
tdSql.query("select last(c1) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkData(0, 1, None)
tdSql.query('select last(c1), c1, ts from meters', queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
try:
tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, None)
try:
tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, None)
def test_cache_scan_with_drop_column(self):
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c9"])
p.check_returncode()
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 9, None)
def test_cache_scan_last_row_with_drop_column(self):
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 10, None)
def test_cache_scan_last_row_with_drop_column2(self):
tdSql.query('select last_row(c1) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkData(0, 0, None)
def test_cache_scan_last_row_with_partition_by(self):
tdSql.query('select last(c1) from meters partition by t1')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def test_cache_scan_last_row_with_partition_by_tbname(self):
tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def run(self):
self.prepareTestEnv()
#time.sleep(99999999)
self.test_last_cache_scan()
#self.test_cache_scan_with_drop_and_add_column()
self.test_cache_scan_with_drop_and_add_column2()
#self.test_cache_scan_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column2()
#self.test_cache_scan_last_row_with_partition_by()
#self.test_cache_scan_last_row_with_partition_by_tbname()
def stop(self):
tdSql.close()