diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f79b11431e..fa89ca917a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3494,6 +3494,8 @@ typedef struct SVUpdateCheckpointInfoReq { int64_t checkpointVer; int64_t checkpointTs; int32_t transId; + int64_t hStreamId; // add encode/decode + int64_t hTaskId; int8_t dropRelHTask; } SVUpdateCheckpointInfoReq; diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 5a83cdc6a8..03e0a0b5f5 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -795,10 +795,10 @@ function is_version_compatible() { if [ -f ${script_dir}/driver/vercomp.txt ]; then min_compatible_version=$(cat ${script_dir}/driver/vercomp.txt) else - min_compatible_version=$(${script_dir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 5) + min_compatible_version=$(${script_dir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 5) fi - exist_version=$(${installDir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 3) + exist_version=$(${installDir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 3) vercomp $exist_version "3.0.0.0" case $? in 2) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a0731833e6..e47f28c309 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -716,6 +716,8 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas pReq->checkpointTs = pInfo->ts; pReq->dropRelHTask = pInfo->dropHTask; pReq->transId = pInfo->transId; + pReq->hStreamId = pTask->hTaskInfo.id.streamId; + pReq->hTaskId = pTask->hTaskInfo.id.taskId; } } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cabccfc0c8..d16c41ec1e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -646,8 +646,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } streamMetaWUnLock(pMeta); - -// tqStreamRemoveTaskBackend(pMeta, &id); return 0; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7e55f2cfeb..3dbba397ba 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3203,6 +3203,27 @@ static bool fromSingleTable(SNode* table) { return false; } +static bool IsEqualTbNameFuncNode(SSelectStmt* pSelect, SNode* pFunc1, SNode* pFunc2) { + if (isTbnameFuction(pFunc1) && isTbnameFuction(pFunc2)) { + SValueNode* pVal1 = (SValueNode*)nodesListGetNode(((SFunctionNode*)pFunc1)->pParameterList, 0); + SValueNode* pVal2 = (SValueNode*)nodesListGetNode(((SFunctionNode*)pFunc1)->pParameterList, 0); + if (!pVal1 && !pVal2) { + return true; + } else if (pVal1 && pVal2) { + return strcmp(pVal1->literal, pVal2->literal) == 0; + } + + if (pSelect->pFromTable && + (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE || pSelect->pFromTable->type == QUERY_NODE_TEMP_TABLE)) { + STableNode* pTable = (STableNode*)pSelect->pFromTable; + return true; + } else { + return false; + } + } + return false; +} + static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { STranslateContext* pCxt = (STranslateContext*)pContext; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; @@ -3222,6 +3243,9 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) { return rewriteExprToGroupKeyFunc(pCxt, pNode); } + if (IsEqualTbNameFuncNode(pSelect, pActualNode, *pNode)) { + return rewriteExprToGroupKeyFunc(pCxt, pNode); + } } SNode* pPartKey = NULL; bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList); @@ -3233,6 +3257,9 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) { return rewriteExprToGroupKeyFunc(pCxt, pNode); } + if (IsEqualTbNameFuncNode(pSelect, pPartKey, *pNode)) { + return rewriteExprToGroupKeyFunc(pCxt, pNode); + } } if (NULL != pSelect->pWindow && QUERY_NODE_STATE_WINDOW == nodeType(pSelect->pWindow)) { if (nodesEqualNode(((SStateWindowNode*)pSelect->pWindow)->pExpr, *pNode)) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index b271c83678..b00767b6b6 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3011,7 +3011,9 @@ static SNode* partTagsCreateWrapperFunc(const char* pFuncName, SNode* pNode) { } snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pFuncName); - if (QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { + if ((QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) || + (QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType && + ((SColumnNode*)pNode)->tableAlias[0] != '\0')){ SColumnNode* pCol = (SColumnNode*)pNode; partTagsSetAlias(pFunc->node.aliasName, pCol->tableAlias, pCol->colName); } else { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6696c9f8c2..eedd8f20d6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -418,7 +418,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; - STaskId hTaskId = {0}; taosThreadMutexLock(&pTask->lock); @@ -434,7 +433,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); @@ -476,9 +475,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } if (pReq->dropRelHTask) { - hTaskId = pTask->hTaskInfo.id; stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", - pReq->taskId, vgId, hTaskId.taskId); + pReq->taskId, vgId, pReq->hTaskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } @@ -499,10 +497,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, - (int32_t)hTaskId.taskId, numOfTasks); + (int32_t)pReq->hTaskId, numOfTasks); } streamMetaWLock(pMeta); diff --git a/tests/system-test/2-query/agg_group_AlwaysReturnValue.py b/tests/system-test/2-query/agg_group_AlwaysReturnValue.py index 9ed0e3fc4a..0cac9cb03b 100755 --- a/tests/system-test/2-query/agg_group_AlwaysReturnValue.py +++ b/tests/system-test/2-query/agg_group_AlwaysReturnValue.py @@ -640,6 +640,15 @@ class TDTestCase(TDTestCase): sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname,count(*) " self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a group by tbname order by a.tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + sql1 = f"select * from ({sql})" self.data_check_tbname(sql1,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') @@ -647,7 +656,16 @@ class TDTestCase(TDTestCase): self.data_check_tbname(sql2,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 where ts is null group by tbname order by tbname,count(*) " - self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by a.tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by a.tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname,AGG(COLUMN) from {dbname}.stable_1 a where ts is null group by a.tbname order by tbname,count(*) " + self.data_check_tbname(sql,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') sql1 = f"select * from ({sql})" self.data_check_tbname(sql1,'NULL','NULL',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') @@ -1011,7 +1029,19 @@ class TDTestCase(TDTestCase): #union all sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname,AGG(COLUMN),count(*)" sql = f"({sql}) union all ({sql}) order by tb" - self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by tbname,AGG(COLUMN),count(*)" + sql = f"({sql}) union all ({sql}) order by tb" + self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by a.tbname,AGG(COLUMN),count(*)" + sql = f"({sql}) union all ({sql}) order by tb" + self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by tbname order by a.tbname,AGG(COLUMN),count(*)" + sql = f"({sql}) union all ({sql}) order by tb" + self.data_check_tbname(sql,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') sql1 = f"select * from ({sql})" self.data_check_tbname(sql1,'AGG24','AGG24',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') @@ -1572,16 +1602,79 @@ class TDTestCase(TDTestCase): tdSql.execute('alter stable stable_1 drop column q_binary5;') tdSql.execute('alter stable stable_1 drop column q_nchar4;') tdSql.execute('alter stable stable_1 drop column q_binary4;') - + + def testTBNameUseJoin(self): + tdSql.execute('CREATE STABLE `meter1` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT)') + tdSql.execute('CREATE STABLE `meter2` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT)') + + tdSql.execute('CREATE TABLE `d1` USING `meter1` (`t1`) TAGS (1)') + tdSql.execute('CREATE TABLE `d2` USING `meter1` (`t1`) TAGS (2)') + tdSql.execute('CREATE TABLE `d21` USING `meter2` (`t1`) TAGS (21)') + tdSql.execute('CREATE TABLE `d22` USING `meter2` (`t1`) TAGS (22)') + + time.sleep(1) + tdSql.query('select tbname,count(*) from d2') + tdSql.checkData(0, 1, 0) + + tdSql.execute('insert into `d1` VALUES (now, 1)') + tdSql.execute('insert into `d1` VALUES (now+1s, 2)') + tdSql.execute('insert into `d1` VALUES (now+2s, 3)') + tdSql.execute('insert into `d2` VALUES (now+3s, 11)') + tdSql.execute('insert into `d2` VALUES (now+4s, 22)') + tdSql.execute('insert into `d2` VALUES (now+5s, 33)') + tdSql.execute('insert into `d21` select * from `d1`') + + # tdSql.query('select b.tbname, count(*) from d1 a, d2 b where a.ts = b.ts group by b.tbname') + # tdSql.checkData(0, 0, 'd2') + + tdSql.query('select meter1.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname') + tdSql.checkData(0, 0, 'd1') + tdSql.checkData(0, 1, 3) + # tdSql.checkData(1, 0, 'd2') + tdSql.query('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter2.tbname order by meter2.tbname') + tdSql.checkData(0, 0, 'd21') + tdSql.checkData(0, 1, 3) + # tdSql.checkData(1, 0, 'd22') + tdSql.query('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by meter2.tbname order by meter2.tbname') + tdSql.checkData(0, 0, 'd21') + tdSql.checkData(0, 1, 3) + # tdSql.checkData(1, 0, 'd22') + tdSql.query('select m2.tbname, count(*) from meter1 m1, meter2 m2 where m1.ts = m2.ts partition by m2.tbname order by m2.tbname') + tdSql.checkData(0, 0, 'd21') + tdSql.checkData(0, 1, 3) + # tdSql.checkData(1, 0, 'd22') + + tdSql.execute('insert into `d22` select * from `d1`') + + tdSql.query('select meter1.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname') + tdSql.checkData(0, 0, 'd1') + tdSql.checkData(0, 1, 6) + + tdSql.query('select m2.tbname, count(*) from meter1 m1, meter2 m2 where m1.ts = m2.ts partition by m2.tbname order by m2.tbname') + tdSql.checkData(0, 0, 'd21') + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 0, 'd22') + tdSql.checkData(1, 1, 3) + + tdSql.error('select tbname, count(*) from d1 a, d2 b where a.ts = b.ts group by b.tbname') + # tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter1.tbname order by meter1.tbname') + tdSql.error('select tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts group by meter2.tbname order by meter2.tbname') + tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by meter2.tbname order by meter.tbname') + tdSql.error('select meter2.tbname, count(*) from meter1, meter2 where meter1.ts = meter2.ts partition by tbname order by meter2.tbname') + tdSql.error('select m2.tbname, count(*) from meter1 m1, meter2 m2 where meter1.ts = meter2.ts partition by m2.tbname order by meter2.tbname') + def run(self): tdSql.prepare() startTime = time.time() # self.create_tables() - # self.insert_data() - + # self.insert_data() + + #self.testTBNameUseJoin() self.dropandcreateDB_random("nested", 1) + self.testTBNameUseJoin() + self.modify_tables() for i in range(1): @@ -1590,6 +1683,8 @@ class TDTestCase(TDTestCase): self.tbname_agg_all() + + endTime = time.time() print("total time %ds" % (endTime - startTime)) diff --git a/tests/system-test/2-query/agg_group_NotReturnValue.py b/tests/system-test/2-query/agg_group_NotReturnValue.py index 73a8fe04c3..83f0acd362 100755 --- a/tests/system-test/2-query/agg_group_NotReturnValue.py +++ b/tests/system-test/2-query/agg_group_NotReturnValue.py @@ -382,7 +382,11 @@ class TDTestCase(TDTestCase): #union all sql = f"select tbname tb,AGG(COLUMN) from {dbname}.stable_1 group by tbname order by tbname " sql = f"({sql}) union all ({sql}) order by tb" - self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') + + sql = f"select a.tbname tb,AGG(COLUMN) from {dbname}.stable_1 a group by a.tbname order by tbname " + sql = f"({sql}) union all ({sql}) order by tb" + self.data_check_tbname(sql,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}') sql1 = f"select * from ({sql})" self.data_check_tbname(sql1,'HAVING>04','HAVING>04',f'{base_fun}',f'{replace_fun}',f'{base_column}',f'{replace_column}')