diff --git a/include/util/tdef.h b/include/util/tdef.h index cbbf3b8ff5..ad7206f7bb 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -247,7 +247,7 @@ typedef enum ELogicConditionType { #define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 -#define TSDB_SHOW_SQL_LEN 512 +#define TSDB_SHOW_SQL_LEN 1024 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SHOW_SUBQUERY_LEN 1000 diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 1517684ccd..38a6bafe9a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -125,11 +125,15 @@ static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR}, -}; + {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + }; static const SSysDbTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 40373d5542..613fa26c2d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -882,6 +882,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this if (pInfo->assignBlockUid) { pInfo->pRes->info.groupId = uid; + } else { + pInfo->pRes->info.groupId = groupId; } int32_t numOfCols = pInfo->pRes->info.numOfCols; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 47dda8fc2b..5d0a0a0270 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1128,7 +1128,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } - pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); } finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);