From 285bf707e00afea4f8684754267e16eda2b870cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 May 2022 15:00:33 +0800 Subject: [PATCH 1/3] fix(query): set the gid for stream result. --- source/libs/executor/src/timewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6965771c73..eb997e9adb 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1128,7 +1128,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } - pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); } finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); From a78fd74dc698ca1afe23226de2afaa74fe0a7996 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 May 2022 15:46:07 +0800 Subject: [PATCH 2/3] fix(query): set group id in stream scanner. --- source/libs/executor/src/scanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 63c6f46083..55db986fb9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -882,6 +882,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this if (pInfo->assignBlockUid) { pInfo->pRes->info.groupId = uid; + } else { + pInfo->pRes->info.groupId = groupId; } int32_t numOfCols = pInfo->pRes->info.numOfCols; From 4995b0ef5b3b1754bf20b24d814d5bed8df1c6c5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 May 2022 16:24:56 +0800 Subject: [PATCH 3/3] refactor: update the stream table schema; --- include/util/tdef.h | 2 +- source/common/src/systable.c | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index cbbf3b8ff5..ad7206f7bb 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -247,7 +247,7 @@ typedef enum ELogicConditionType { #define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 -#define TSDB_SHOW_SQL_LEN 512 +#define TSDB_SHOW_SQL_LEN 1024 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 1517684ccd..38a6bafe9a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -125,11 +125,15 @@ static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR}, -}; + {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + }; static const SSysDbTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},