Merge branch '3.0' into enh/TD-30490-3.0

This commit is contained in:
kailixu 2024-06-13 08:54:57 +08:00
commit a876164b33
9 changed files with 145 additions and 17 deletions

View File

@ -3494,6 +3494,8 @@ typedef struct SVUpdateCheckpointInfoReq {
int64_t checkpointVer;
int64_t checkpointTs;
int32_t transId;
int64_t hStreamId; // add encode/decode
int64_t hTaskId;
int8_t dropRelHTask;
} SVUpdateCheckpointInfoReq;

View File

@ -795,10 +795,10 @@ function is_version_compatible() {
if [ -f ${script_dir}/driver/vercomp.txt ]; then
min_compatible_version=$(cat ${script_dir}/driver/vercomp.txt)
else
min_compatible_version=$(${script_dir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 5)
min_compatible_version=$(${script_dir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 5)
fi
exist_version=$(${installDir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 3)
exist_version=$(${installDir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 3)
vercomp $exist_version "3.0.0.0"
case $? in
2)

View File

@ -716,6 +716,8 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
pReq->checkpointTs = pInfo->ts;
pReq->dropRelHTask = pInfo->dropHTask;
pReq->transId = pInfo->transId;
pReq->hStreamId = pTask->hTaskInfo.id.streamId;
pReq->hTaskId = pTask->hTaskInfo.id.taskId;
}
}

View File

@ -646,8 +646,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
}
streamMetaWUnLock(pMeta);
// tqStreamRemoveTaskBackend(pMeta, &id);
return 0;
}

View File

@ -3203,6 +3203,27 @@ static bool fromSingleTable(SNode* table) {
return false;
}
static bool IsEqualTbNameFuncNode(SSelectStmt* pSelect, SNode* pFunc1, SNode* pFunc2) {
if (isTbnameFuction(pFunc1) && isTbnameFuction(pFunc2)) {
SValueNode* pVal1 = (SValueNode*)nodesListGetNode(((SFunctionNode*)pFunc1)->pParameterList, 0);
SValueNode* pVal2 = (SValueNode*)nodesListGetNode(((SFunctionNode*)pFunc1)->pParameterList, 0);
if (!pVal1 && !pVal2) {
return true;
} else if (pVal1 && pVal2) {
return strcmp(pVal1->literal, pVal2->literal) == 0;
}
if (pSelect->pFromTable &&
(pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE || pSelect->pFromTable->type == QUERY_NODE_TEMP_TABLE)) {
STableNode* pTable = (STableNode*)pSelect->pFromTable;
return true;
} else {
return false;
}
}
return false;
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
@ -3222,6 +3243,9 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
if (IsEqualTbNameFuncNode(pSelect, pActualNode, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
SNode* pPartKey = NULL;
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
@ -3233,6 +3257,9 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
if (IsEqualTbNameFuncNode(pSelect, pPartKey, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
if (NULL != pSelect->pWindow && QUERY_NODE_STATE_WINDOW == nodeType(pSelect->pWindow)) {
if (nodesEqualNode(((SStateWindowNode*)pSelect->pWindow)->pExpr, *pNode)) {

View File

@ -3011,7 +3011,9 @@ static SNode* partTagsCreateWrapperFunc(const char* pFuncName, SNode* pNode) {
}
snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pFuncName);
if (QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
if ((QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) ||
(QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType &&
((SColumnNode*)pNode)->tableAlias[0] != '\0')){
SColumnNode* pCol = (SColumnNode*)pNode;
partTagsSetAlias(pFunc->node.aliasName, pCol->tableAlias, pCol->colName);
} else {

View File

@ -418,7 +418,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
STaskId hTaskId = {0};
taosThreadMutexLock(&pTask->lock);
@ -434,7 +433,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
@ -476,9 +475,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
}
if (pReq->dropRelHTask) {
hTaskId = pTask->hTaskInfo.id;
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, hTaskId.taskId);
pReq->taskId, vgId, pReq->hTaskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
}
@ -499,10 +497,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)hTaskId.taskId, numOfTasks);
(int32_t)pReq->hTaskId, numOfTasks);
}
streamMetaWLock(pMeta);

View File

@ -640,6 +640,15 @@ class TDTestCase(TDTestCase):
sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a group by tbname order by a.tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql1 = f"select * from ({sql})"
self.data_check_tbname(sql1,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
@ -647,7 +656,16 @@ class TDTestCase(TDTestCase):
self.data_check_tbname(sql2,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 where ts is null group by tbname order by tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by a.tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by a.tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by tbname,count(*) "
self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql1 = f"select * from ({sql})"
self.data_check_tbname(sql1,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
@ -1011,7 +1029,19 @@ class TDTestCase(TDTestCase):
#union all
sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname,AGG(COLUMN),count(*)"
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by tbname,AGG(COLUMN),count(*)"
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,AGG(COLUMN),count(*)"
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by tbname order by a.tbname,AGG(COLUMN),count(*)"
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql1 = f"select * from ({sql})"
self.data_check_tbname(sql1,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
@ -1572,16 +1602,79 @@ class TDTestCase(TDTestCase):
tdSql.execute('alter stable stable_1 drop column q_binary5;')
tdSql.execute('alter stable stable_1 drop column q_nchar4;')
tdSql.execute('alter stable stable_1 drop column q_binary4;')
def testTBNameUseJoin(self):
tdSql.execute('CREATE STABLE `meter1` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT)')
tdSql.execute('CREATE STABLE `meter2` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT)')
tdSql.execute('CREATE TABLE `d1` USING `meter1` (`t1`) TAGS (1)')
tdSql.execute('CREATE TABLE `d2` USING `meter1` (`t1`) TAGS (2)')
tdSql.execute('CREATE TABLE `d21` USING `meter2` (`t1`) TAGS (21)')
tdSql.execute('CREATE TABLE `d22` USING `meter2` (`t1`) TAGS (22)')
time.sleep(1)
tdSql.query('select tbname,count(*) from d2')
tdSql.checkData(0, 1, 0)
tdSql.execute('insert into `d1` VALUES (now, 1)')
tdSql.execute('insert into `d1` VALUES (now+1s, 2)')
tdSql.execute('insert into `d1` VALUES (now+2s, 3)')
tdSql.execute('insert into `d2` VALUES (now+3s, 11)')
tdSql.execute('insert into `d2` VALUES (now+4s, 22)')
tdSql.execute('insert into `d2` VALUES (now+5s, 33)')
tdSql.execute('insert into `d21` select * from `d1`')
# tdSql.query('select b.tbname, count(*) from d1 a, d2 b where a.ts = b.ts group by b.tbname')
# tdSql.checkData(0, 0, 'd2')
tdSql.query('select meter1.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname')
tdSql.checkData(0, 0, 'd1')
tdSql.checkData(0, 1, 3)
# tdSql.checkData(1, 0, 'd2')
tdSql.query('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter2.tbname order by meter2.tbname')
tdSql.checkData(0, 0, 'd21')
tdSql.checkData(0, 1, 3)
# tdSql.checkData(1, 0, 'd22')
tdSql.query('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by meter2.tbname order by meter2.tbname')
tdSql.checkData(0, 0, 'd21')
tdSql.checkData(0, 1, 3)
# tdSql.checkData(1, 0, 'd22')
tdSql.query('select m2.tbname, count(*) from meter1 m1, meter2 m2 where m1.ts = m2.ts partition by m2.tbname order by m2.tbname')
tdSql.checkData(0, 0, 'd21')
tdSql.checkData(0, 1, 3)
# tdSql.checkData(1, 0, 'd22')
tdSql.execute('insert into `d22` select * from `d1`')
tdSql.query('select meter1.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname')
tdSql.checkData(0, 0, 'd1')
tdSql.checkData(0, 1, 6)
tdSql.query('select m2.tbname, count(*) from meter1 m1, meter2 m2 where m1.ts = m2.ts partition by m2.tbname order by m2.tbname')
tdSql.checkData(0, 0, 'd21')
tdSql.checkData(0, 1, 3)
tdSql.checkData(1, 0, 'd22')
tdSql.checkData(1, 1, 3)
tdSql.error('select tbname, count(*) from d1 a, d2 b where a.ts = b.ts group by b.tbname')
# tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname')
tdSql.error('select tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter2.tbname order by meter2.tbname')
tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by meter2.tbname order by meter.tbname')
tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by tbname order by meter2.tbname')
tdSql.error('select m2.tbname, count(*) from meter1 m1, meter2 m2 where meter1.ts = meter2.ts partition by m2.tbname order by meter2.tbname')
def run(self):
tdSql.prepare()
startTime = time.time()
# self.create_tables()
# self.insert_data()
# self.insert_data()
#self.testTBNameUseJoin()
self.dropandcreateDB_random("nested", 1)
self.testTBNameUseJoin()
self.modify_tables()
for i in range(1):
@ -1590,6 +1683,8 @@ class TDTestCase(TDTestCase):
self.tbname_agg_all()
endTime = time.time()
print("total time %ds" % (endTime - startTime))

View File

@ -382,7 +382,11 @@ class TDTestCase(TDTestCase):
#union all
sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname "
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by tbname "
sql = f"({sql}) union all ({sql}) order by tb"
self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')
sql1 = f"select * from ({sql})"
self.data_check_tbname(sql1,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')