diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index 5a884cc7e2..ca22a6ace7 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -25,7 +25,7 @@ create_definition: col_name column_definition column_definition: - type_name [comment 'string_value'] [PRIMARY KEY] + type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... @@ -51,6 +51,7 @@ table_option: { Only ASCII visible characters can be used with escape character. **Parameter description** + 1. COMMENT: specifies comments for the table. This parameter can be used with supertables, standard tables, and subtables. 2. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables. 3. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire. @@ -104,6 +105,7 @@ alter_table_option: { **More explanations** You can perform the following modifications on existing tables: + 1. ADD COLUMN: adds a column to the supertable. 2. DROP COLUMN: deletes a column from the supertable. 3. MODIFY COLUMN: changes the length of the data type specified for the column. Note that you can only specify a length greater than the current length. @@ -154,6 +156,7 @@ alter_table_option: { ``` **More explanations** + 1. Only the value of a tag can be modified directly. For all other modifications, you must modify the supertable from which the subtable was created. ### Change Tag Value Of Sub Table diff --git a/docs/en/12-taos-sql/27-indexing.md b/docs/en/12-taos-sql/27-indexing.md index dfe3ef527c..9f77e21599 100644 --- a/docs/en/12-taos-sql/27-indexing.md +++ b/docs/en/12-taos-sql/27-indexing.md @@ -28,9 +28,9 @@ In the function list, you can only specify supported aggregate functions (see be Since the output of TSMA is a super table, the row length of the output table is subject to the maximum row length limit. The size of the `intermediate results of different functions` varies, but they are generally larger than the original data size. If the row length of the output table exceeds the maximum row length limit, an error `Row length exceeds max length` will be reported. In this case, you need to reduce the number of functions or split commonly used functions groups into multiple TSMA objects. -The window size is limited to [1ms ~ 1h]. The unit of INTERVAL is the same as the INTERVAL clause in the query, such as a (milliseconds), b (nanoseconds), h (hours), m (minutes), s (seconds), u (microseconds). +The window size is limited to [1m ~ 1h]. The unit of INTERVAL is the same as the INTERVAL clause in the query, such as a (milliseconds), b (nanoseconds), h (hours), m (minutes), s (seconds), u (microseconds). -TSMA is a database-level object, but it is globally unique. The number of TSMA that can be created in the cluster is limited by the parameter `maxTsmaNum`, with a default value of 8 and a range of [0-12]. Note that since TSMA background calculation uses stream computing, creating a TSMA will create a stream. Therefore, the number of TSMA that can be created is also limited by the number of existing streams and the maximum number of streams that can be created. +TSMA is a database-level object, but it is globally unique. The number of TSMA that can be created in the cluster is limited by the parameter `maxTsmaNum`, with a default value of 3 and a range of [0-3]. Note that since TSMA background calculation uses stream computing, creating a TSMA will create a stream. Therefore, the number of TSMA that can be created is also limited by the number of existing streams and the maximum number of streams that can be created. ## Supported Functions | function | comments | @@ -44,7 +44,6 @@ TSMA is a database-level object, but it is globally unique. The number of TSMA t |count| If you want to use count(*), you should create the count(ts) function| |spread|| |stddev|| -|hyperloglog|| ||| ## Drop TSMA @@ -65,6 +64,8 @@ Client configuration parameter: `querySmaOptimize`, used to control whether to u Client configuration parameter: `maxTsmaCalcDelay`, in seconds, is used to control the acceptable TSMA calculation delay for users. If the calculation progress of a TSMA is within this range from the latest time, the TSMA will be used. If it exceeds this range, it will not be used. The default value is 600 (10 minutes), with a minimum value of 600 (10 minutes) and a maximum value of 86400 (1 day). +Client configuration parameter: `tsmaDataDeleteMark`, in milliseconds, consistent with the stream computing parameter `deleteMark`, is used to control the retention time of intermediate results in stream computing. The default value is 1 day, with a minimum value of 1 hour. Therefore, historical data that is older than the configuration parameter will not have the intermediate results saved in stream computing. If you modify the data within these time windows, the TSMA calculation results will not include the updated results. This means that the TSMA results will be inconsistent with querying the original data. + ### Using TSMA Duraing Query The aggregate functions defined in TSMA can be directly used in most query scenarios. If multiple TSMA are available, the one with the larger window size is preferred. For unclosed windows, the calculation can be done using smaller window TSMA or the original data. However, there are certain scenarios where TSMA cannot be used (see below). In such cases, the entire query will be calculated using the original data. @@ -131,4 +132,4 @@ SHOW [db_name.]TSMAS; SELECT * FROM information_schema.ins_tsma; ``` -If more functions are specified during creation, and the column names are longer, the function list may be truncated when displayed (currently supports a maximum output of 256KB) \ No newline at end of file +If more functions are specified during creation, and the column names are longer, the function list may be truncated when displayed (currently supports a maximum output of 256KB) diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 1a9df366e3..a130bca65f 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -250,6 +250,15 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo | Value Range | 600s - 86400s, 10 minutes to 1 hour | | Default value | 600s | +### tsmaDataDeleteMark + +| Attribute | Description | +| -------- | --------------------------- | +| Applicable | Client only | +| Meaning | The duration for which the intermediate results of TSMA calculations are saved, in milliseconds | +| Value Range | >= 3600000, greater than or equal to 1 hour | +| Default value | 86400000, 1d | + ## Locale Parameters @@ -776,8 +785,8 @@ The charset that takes effect is UTF-8. | --------- | ----------------------------- | | Applicable | Server Only | | Meaning | Max num of TSMAs | -| Value Range | 0-12 | -| Default Value | 8 | +| Value Range | 0-3 | +| Default Value | 3 | ## 3.0 Parameters diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index f0c2bd407b..773ce75430 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -23,7 +23,10 @@ create_subtable_clause: { } create_definition: - col_name column_type [PRIMARY KEY] + col_name column_definition + +column_definition: + type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type'] table_options: table_option ... diff --git a/docs/zh/12-taos-sql/27-indexing.md b/docs/zh/12-taos-sql/27-indexing.md index 31057a67f8..189042c27a 100644 --- a/docs/zh/12-taos-sql/27-indexing.md +++ b/docs/zh/12-taos-sql/27-indexing.md @@ -28,9 +28,9 @@ TSMA只能基于超级表和普通表创建, 不能基于子表创建. 由于TSMA输出为一张超级表, 因此输出表的行长度受最大行长度限制, 不同函数的`中间结果`大小各异, 一般都大于原始数据大小, 若输出表的行长度大于最大行长度限制, 将会报`Row length exceeds max length`错误. 此时需要减少函数个数或者将常用的函数进行分组拆分到多个TSMA中. -窗口大小的限制为[1ms ~ 1h]. INTERVAL 的单位与查询中INTERVAL字句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微妙). +窗口大小的限制为[1m ~ 1h]. INTERVAL 的单位与查询中INTERVAL子句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微妙). -TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数受参数`maxTsmaNum`限制, 参数默认值为8, 范围: [0-12]. 注意, 由于TSMA后台计算使用流计算, 因此每创建一条TSMA, 将会创建一条流, 因此能够创建的TSMA条数也受当前已经存在的流条数和最大可创建流条数限制. +TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数受参数`maxTsmaNum`限制, 参数默认值为3, 范围: [0-3]. 注意, 由于TSMA后台计算使用流计算, 因此每创建一条TSMA, 将会创建一条流, 因此能够创建的TSMA条数也受当前已经存在的流条数和最大可创建流条数限制. ## 支持的函数列表 | 函数| 备注 | @@ -44,7 +44,6 @@ TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数 |count| 若想使用count(*), 则应创建count(ts)函数| |spread|| |stddev|| -|hyperloglog|| ||| ## 删除TSMA @@ -64,6 +63,8 @@ TSMA的计算结果为与原始表相同库下的一张超级表, 此表用户 客户端配置参数:`maxTsmaCalcDelay`,单位 s,用于控制用户可以接受的 TSMA 计算延迟,若 TSMA 的计算进度与最新时间差距在此范围内, 则该 TSMA 将会被使用, 若超出该范围, 则不使用, 默认值: 600(10 分钟), 最小值: 600(10 分钟), 最大值: 86400(1 天). +客户端配置参数: `tsmaDataDeleteMark`, 单位毫秒, 与流计算参数`deleteMark`一致, 用于控制流计算中间结果的保存时间, 默认值为: 1d, 最小值为1h. 因此那些距最后一条数据时间大于配置参数的历史数据将不保存流计算中间结果, 因此若修改这些时间窗口内的数据, TSMA的计算结果中将不包含更新的结果. 即与查询原始数据结果将不一致. + ### 查询时使用TSMA 已在 TSMA 中定义的 agg 函数在大部分查询场景下都可直接使用, 若存在多个可用的 TSMA, 优先使用大窗口的 TSMA, 未闭合窗口通过查询小窗口TSMA或者原始数据计算。 同时也有某些场景不能使用 TSMA(见下文)。 不可用时整个查询将使用原始数据进行计算。 @@ -129,4 +130,4 @@ SELECT COUNT(*), MIN(c1) FROM stable where c2 > 0; ---- can't use tsma1 or tsam2 SHOW [db_name.]TSMAS; SELECT * FROM information_schema.ins_tsma; ``` -若创建时指定的较多的函数, 且列名较长, 在显示函数列表时可能会被截断(目前最大支持输出256KB). \ No newline at end of file +若创建时指定的较多的函数, 且列名较长, 在显示函数列表时可能会被截断(目前最大支持输出256KB). diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index cb6ae8451f..01aa944d95 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -249,6 +249,15 @@ taos -C | 取值范围 | 600s - 86400s, 即10分钟-1小时 | | 缺省值 | 600s | +### tsmaDataDeleteMark + +| 属性 | 说明 | +| -------- | --------------------------- | +| 适用范围 | 仅客户端适用 | +| 含义 | TSMA计算的历史数据中间结果保存时间, 单位为毫秒 | +| 取值范围 | >= 3600000, 即大于等于1h | +| 缺省值 | 86400000, 即1d | + ## 区域相关 @@ -761,8 +770,8 @@ charset 的有效值是 UTF-8。 | -------- | --------------------------- | | 适用范围 | 仅服务端适用 | | 含义 | 集群内可创建的TSMA个数 | -| 取值范围 | 0-12 | -| 缺省值 | 8 | +| 取值范围 | 0-3 | +| 缺省值 | 3 | ## 压缩参数 diff --git a/include/common/cos.h b/include/common/cos.h index 8e48533304..17c48d594b 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); -int32_t s3GetObjectToFile(const char *object_name, char *fileName); +int32_t s3GetObjectToFile(const char *object_name, const char *fileName); #define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 3b8929f241..7c2d63e025 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -223,6 +223,7 @@ extern int32_t tmqMaxTopicNum; extern int32_t tmqRowSize; extern int32_t tsMaxTsmaNum; extern int32_t tsMaxTsmaCalcDelay; +extern int64_t tsmaDataDeleteMark; // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/common/tgrant.h b/include/common/tgrant.h index c1e37787c2..5a2ed58045 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -60,6 +60,7 @@ typedef enum { int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key); int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode); +int64_t grantRemain(EGrantType grant); int32_t grantCheck(EGrantType grant); int32_t grantCheckExpire(EGrantType grant); char *tGetMachineId(); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4423bf94b6..e3487c49d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,6 +56,7 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) +#define STREAM_EXEC_T_ADD_FAILED_TASK (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -443,6 +444,7 @@ typedef struct SDownstreamStatusInfo { typedef struct STaskCheckInfo { SArray* pList; int64_t startTs; + int64_t timeoutStartTs; int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; @@ -547,7 +549,7 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - void* qHandle; + void* qHandle; // todo remove it void* bkdChkptMgt; } SStreamMeta; @@ -885,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 460b8962ea..95f70c8ff3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcFFfp)(tmsg_t msgType); +typedef bool (*RpcNoDelayfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -118,6 +119,8 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; + RpcNoDelayfp noDelayFp; + int32_t connLimitNum; int32_t connLimitLock; int32_t timeToGetConn; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 916de6e715..03a024bb8c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -77,6 +77,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) +#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 990bfdcea3..0db6664ab9 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; } +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; } #endif diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 14e8088dfe..bf2f14339d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; @@ -193,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cad1145a6b..87b72bdead 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,7 +60,7 @@ int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; int32_t tsNumOfCommitThreads = 2; -int32_t tsNumOfTaskQueueThreads = 4; +int32_t tsNumOfTaskQueueThreads = 10; int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; @@ -308,8 +308,9 @@ int32_t tsS3UploadDelaySec = 60; bool tsExperimental = true; -int32_t tsMaxTsmaNum = 8; +int32_t tsMaxTsmaNum = 3; int32_t tsMaxTsmaCalcDelay = 600; +int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -552,12 +553,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1; - tsNumOfTaskQueueThreads = tsNumOfCores / 2; - tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + tsNumOfTaskQueueThreads = tsNumOfCores; + tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10); - if (tsNumOfTaskQueueThreads >= 50) { - tsNumOfTaskQueueThreads = 50; - } if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; @@ -571,6 +569,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxTsmaCalcDelay", tsMaxTsmaCalcDelay, 600, 86400, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; + if (cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, + CFG_DYN_CLIENT) != 0) + return -1; return 0; } @@ -751,7 +752,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 12, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 3, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1144,6 +1145,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval; tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32; + tsmaDataDeleteMark = cfgGetItem(pCfg, "tsmaDataDeleteMark")->i32; return 0; } @@ -1810,7 +1812,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"useAdapter", &tsUseAdapter}, {"experimental", &tsExperimental}, {"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags}, - {"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}}; + {"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}, + {"tsmaDataDeleteMark", &tsmaDataDeleteMark}}; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 45b0b6ac2b..3836f13a2f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { int32_t nVgroup = 0; if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit; if (nVgroup > 0) { - pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite)); + pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup); if (!pReq->pVloads) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } for (int32_t i = 0; i < nVgroup; ++i) { - SVnodeLoadLite vload; - if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit; - if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit; - taosArrayPush(pReq->pVloads, &vload); + SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i); + if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit; + if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit; } } diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e1fe69714..46f8dd06d4 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -49,7 +49,7 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); -void dmSendNotifyReq(SDnodeMgmt *pMgmt); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 355b47c10b..3ec080fb21 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -169,23 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dmProcessStatusRsp(pMgmt, &rpcRsp); } -void dmSendNotifyReq(SDnodeMgmt *pMgmt) { - SNotifyReq req = {0}; - - taosThreadRwlockRdlock(&pMgmt->pData->lock); - req.dnodeId = pMgmt->pData->dnodeId; - taosThreadRwlockUnlock(&pMgmt->pData->lock); - - req.clusterId = pMgmt->pData->clusterId; - - SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); - req.pVloads = vinfo.pVloads; - - int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); - void * pHead = rpcMallocCont(contLen); - tSerializeSNotifyReq(pHead, contLen, &req); - tFreeSNotifyReq(&req); +void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { + int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq); + void *pHead = rpcMallocCont(contLen); + tSerializeSNotifyReq(pHead, contLen, pReq); SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index d124eb74be..c48b614f96 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" +#include "tgrant.h" #include "thttp.h" static void *dmStatusThreadFp(void *param) { @@ -47,21 +48,97 @@ static void *dmStatusThreadFp(void *param) { } SDmNotifyHandle dmNotifyHdl = {.state = 0}; - +#define TIMESERIES_STASH_NUM 5 static void *dmNotifyThreadFp(void *param) { SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-notify"); if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { return NULL; } - bool wait = true; + // calculate approximate timeSeries per second + int64_t notifyTimeStamp[TIMESERIES_STASH_NUM]; + int64_t notifyTimeSeries[TIMESERIES_STASH_NUM]; + int64_t approximateTimeSeries = 0; + uint64_t nTotalNotify = 0; + int32_t head, tail = 0; + + bool wait = true; + int32_t nDnode = 0; + int64_t lastNotify = 0; + int64_t lastFetchDnode = 0; + SNotifyReq req = {0}; while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - dmSendNotifyReq(pMgmt); + + int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES); + if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) { + goto _skip; + } + int64_t current = taosGetTimestampMs(); + if (current - lastFetchDnode > 1000) { + nDnode = dmGetDnodeSize(pMgmt->pData); + if (nDnode < 1) nDnode = 1; + lastFetchDnode = current; + } + if (req.dnodeId == 0 || req.clusterId == 0) { + req.dnodeId = pMgmt->pData->dnodeId; + req.clusterId = pMgmt->pData->clusterId; + } + + if (current - lastNotify < 10) { + int64_t nCmprTimeSeries = approximateTimeSeries / 100; + if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5; + if (remainTimeSeries > nCmprTimeSeries * 10) { + taosMsleep(10); + } else if (remainTimeSeries > nCmprTimeSeries * 5) { + taosMsleep(5); + } else { + taosMsleep(2); + } + } + + SMonVloadInfo vinfo = {0}; + (*pMgmt->getVnodeLoadsLiteFp)(&vinfo); + req.pVloads = vinfo.pVloads; + int32_t nVgroup = taosArrayGetSize(req.pVloads); + int64_t nTimeSeries = 0; + for (int32_t i = 0; i < nVgroup; ++i) { + SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i); + nTimeSeries += vload->nTimeSeries; + } + notifyTimeSeries[tail] = nTimeSeries; + notifyTimeStamp[tail] = taosGetTimestampNs(); + ++nTotalNotify; + + approximateTimeSeries = 0; + if (nTotalNotify >= TIMESERIES_STASH_NUM) { + head = tail - TIMESERIES_STASH_NUM + 1; + if (head < 0) head += TIMESERIES_STASH_NUM; + int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head]; + int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head]; + if (tsDiff > 0) { + if (timeDiff > 0 && timeDiff < 1e9) { + approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff; + if ((approximateTimeSeries * nDnode) > remainTimeSeries) { + dmSendNotifyReq(pMgmt, &req); + } + } else { + dmSendNotifyReq(pMgmt, &req); + } + } + } else { + dmSendNotifyReq(pMgmt, &req); + } + if (++tail == TIMESERIES_STASH_NUM) tail = 0; + + tFreeSNotifyReq(&req); + lastNotify = taosGetTimestampMs(); + _skip: if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { wait = true; continue; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 754c42b82e..a2355ddd22 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) { return false; } } - +static bool rpcNoDelayMsg(tmsg_t msgType) { + if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE || + msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) { + return true; + } + return false; +} int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; + rpcInit.noDelayFp = rpcNoDelayMsg; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); @@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0344ca685d..9fdd5e50ed 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir); int32_t dmInitDndInfo(SDnodeData *pData); // dmEps.c +int32_t dmGetDnodeSize(SDnodeData *pData); int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 4b41b17cb1..c585a780ac 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -355,6 +355,14 @@ _OVER: return code; } +int32_t dmGetDnodeSize(SDnodeData *pData) { + int32_t size = 0; + taosThreadRwlockRdlock(&pData->lock); + size = taosArrayGetSize(pData->dnodeEps); + taosThreadRwlockUnlock(&pData->lock); + return size; +} + void dmUpdateEps(SDnodeData *pData, SArray *eps) { taosThreadRwlockWrlock(&pData->lock); dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer); diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index cce386785a..8dae4b3c11 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); } void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {} void grantAdd(EGrantType grant, uint64_t value) {} void grantRestore(EGrantType grant, uint64_t value) {} +int64_t grantRemain(EGrantType grant) { return 0; } char *tGetMachineId() { return NULL; }; int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 22d2eb5a59..fafdb539fb 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -22,10 +22,10 @@ #include "mndPrivilege.h" #include "mndQnode.h" #include "mndShow.h" +#include "mndSma.h" #include "mndStb.h" #include "mndUser.h" #include "mndView.h" -#include "mndSma.h" #include "tglobal.h" #include "tversion.h" @@ -57,6 +57,13 @@ typedef struct { int64_t lastAccessTimeMs; } SAppObj; +typedef struct { + int32_t totalDnodes; + int32_t onlineDnodes; + SEpSet epSet; + SArray *pQnodeList; +} SConnPreparedObj; + static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); @@ -460,7 +467,7 @@ static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) { } static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, - SClientHbBatchRsp *pBatchRsp) { + SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SRpcConnInfo connInfo = pMsg->info.conn; @@ -503,11 +510,11 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } rspBasic->connId = pConn->id; - rspBasic->totalDnodes = mndGetDnodeSize(pMnode); - mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes); - mndGetMnodeEpSet(pMnode, &rspBasic->epSet); - - mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); + rspBasic->connId = pConn->id; + rspBasic->totalDnodes = pObj->totalDnodes; + rspBasic->onlineDnodes = pObj->onlineDnodes; + rspBasic->epSet = pObj->epSet; + rspBasic->pQnodeList = taosArrayDup(pObj->pQnodeList, NULL); mndReleaseConn(pMnode, pConn, true); @@ -608,7 +615,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } #endif case HEARTBEAT_KEY_TSMA: { - void * rspMsg = NULL; + void *rspMsg = NULL; int32_t rspLen = 0; mndValidateTSMAInfo(pMnode, kv->value, kv->valueLen / sizeof(STSMAVersion), &rspMsg, &rspLen); if (rspMsg && rspLen > 0) { @@ -641,6 +648,12 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { return -1; } + SConnPreparedObj obj = {0}; + obj.totalDnodes = mndGetDnodeSize(pMnode); + mndGetOnlineDnodeNum(pMnode, &obj.onlineDnodes); + mndGetMnodeEpSet(pMnode, &obj.epSet); + mndCreateQnodeList(pMnode, &obj.pQnodeList, -1); + SClientHbBatchRsp batchRsp = {0}; batchRsp.svrTimestamp = taosGetTimestampSec(); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); @@ -649,7 +662,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { for (int i = 0; i < sz; i++) { SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); if (pHbReq->connKey.connType == CONN_TYPE__QUERY) { - mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp); + mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp, &obj); } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { @@ -668,6 +681,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { pReq->info.rspLen = tlen; pReq->info.rsp = buf; + taosArrayDestroy(obj.pQnodeList); + return 0; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 02c932289f..5c8bb22c22 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1455,6 +1455,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->targetStbUid = 0; pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->igUpdate = 0; + pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark; pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs; pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid; pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast); @@ -2340,11 +2341,6 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) { } _OVER: - if (code != 0) { - mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(), - tsmaReq.fetchingWithTsmaName); - } - tFreeTableTSMAInfoRsp(&rsp); return code; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b50ed095bd..61fc180dc6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -4216,7 +4216,10 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) { SMnode *pMnode = pRsp->info.node; SVFetchTtlExpiredTbsRsp rsp = {0}; SMndDropTbsWithTsmaCtx *pCtx = NULL; - if (pRsp->code != TSDB_CODE_SUCCESS) goto _end; + if (pRsp->code != TSDB_CODE_SUCCESS) { + terrno = pRsp->code; + goto _end; + } if (pRsp->contLen == 0) { code = 0; goto _end; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3dc26b4118..c87b8e84f4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint backup type + char backup[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(backup, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); + + // history scan idle char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); @@ -1644,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); - // checkpoint info + // checkpoint version pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + // checkpoint backup status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + // ds_err_info pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, 0, true); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 04c0c0d204..924b0a8207 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +typedef struct SMStreamCheckpointReadyRspMsg { + SMsgHead head; +} SMStreamCheckpointReadyRspMsg; + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - int64_t initTs = 0; - int64_t now = taosGetTimestampMs(); - STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId}; - STaskId fId = {0}; - bool hasHistoryTask = false; - - // todo extract method if (!isLeader) { - // this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map. - streamMetaRLock(pMeta); - - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - } else { - streamMetaRUnLock(pMeta); - - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - return code; + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { - streamMetaRLock(pMeta); - - // let's try to find this task in hashmap - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - setParam(*ppTask, &initTs, &hasHistoryTask, &fId); - streamMetaRUnLock(pMeta); - - if (hasHistoryTask) { - streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); - } - } else { // not exist even in the hash map of meta, forget it - streamMetaRUnLock(pMeta); - } - - streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); - - code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return code; + return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId); } code = streamProcessCheckRsp(pTask, &rsp); @@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } -typedef struct SMStreamCheckpointReadyRspMsg { - SMsgHead head; -} SMStreamCheckpointReadyRspMsg; - int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; + } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { + int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index bcd1ab5c18..67c4a8d875 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3169,7 +3169,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "hyperloglog", .type = FUNCTION_TYPE_HYPERLOGLOG, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC, .translateFunc = translateHLL, .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, @@ -3181,7 +3181,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { #endif .combineFunc = hllCombine, .pPartialFunc = "_hyperloglog_partial", - .pStateFunc = "_hyperloglog_state", .pMergeFunc = "_hyperloglog_merge" }, { @@ -3211,7 +3210,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = NULL, #endif .combineFunc = hllCombine, - .pPartialFunc = "_hyperloglog_state_merge", .pMergeFunc = "_hyperloglog_merge", }, { @@ -4087,8 +4085,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunctionMerge, .finalizeFunc = firstLastPartialFinalize, }, - { - .name = "_hyperloglog_state", + { .name = "_hyperloglog_state", .type = FUNCTION_TYPE_HYPERLOGLOG_STATE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC, .translateFunc = translateHLLState, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8f77f0dedf..8329860063 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10928,7 +10928,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i; pReq->intervalUnit = TIME_UNIT_MILLISECOND; -#define TSMA_MIN_INTERVAL_MS 1 // 1ms +#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m #define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) { return TSDB_CODE_TSMA_INVALID_INTERVAL; @@ -10989,6 +10989,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm } if (TSDB_CODE_SUCCESS == code) { + pReq->deleteMark = convertTimePrecision(tsmaDataDeleteMark, TSDB_TIME_PRECISION_MILLI, pTableMeta->tableInfo.precision); code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 07dce9a451..0ee31197dc 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); -void streamTaskSetCheckpointFailedId(SStreamTask* pTask); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskGetTaskId(const SStreamTask* pTask); @@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); -int32_t deleteCheckpointFile(char* id, char* name); -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 0a87833055..ea9b2ef89f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); + stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); @@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { pInfo->stopCheckProcess = 1; taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set stop check rsp mon", id); + stDebug("s-task:%s set stop check-rsp monit", id); return TSDB_CODE_SUCCESS; } @@ -272,6 +272,8 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; + pInfo->timeoutStartTs = startTs; + pInfo->stopCheckProcess = 0; return TSDB_CODE_SUCCESS; } @@ -293,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -329,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { return TSDB_CODE_FAILED; } - stDebug("s-task:%s set the in-check-procedure flag", id); + stDebug("s-task:%s set the in check-rsp flag", id); return TSDB_CODE_SUCCESS; } @@ -343,9 +345,10 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* } int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; - stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; + pInfo->timeoutStartTs = 0; pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; @@ -458,6 +461,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); ASSERT(pTask->status.downstreamReady == 0); + pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); @@ -488,7 +492,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); } else { stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs); } } @@ -517,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } +// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. +// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution +// of restart in timer thread will result in a dead lock. +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); + return -1; + } + + stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return 0; +} + +// this function is executed in timer thread void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamMeta* pMeta = pTask->pMeta; @@ -524,7 +552,7 @@ void rspMonitorFn(void* param, void* tmrId) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); - int64_t el = now - pInfo->startTs; + int64_t timeoutDuration = now - pInfo->timeoutStartTs; ETaskStatus state = pStat->state; const char* id = pTask->id.idStr; int32_t numOfReady = 0; @@ -541,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); - - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return; @@ -577,7 +600,7 @@ void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -614,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - + addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); @@ -639,8 +656,10 @@ void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", - id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug( + "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, " + "ready:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ec96ad7108..824a33dd13 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -26,6 +26,9 @@ typedef struct { SStreamTask* pTask; } SAsyncUploadArg; +static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); +static int32_t deleteCheckpointFile(char* id, char* name); + int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -376,21 +379,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { return code; } -void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); } -int32_t getChkpMeta(char* id, char* path, SArray* list) { +static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { char* file = taosMemoryCalloc(1, strlen(path) + 32); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); - int32_t code = downloadCheckpointByName(id, "META", file); + + int32_t code = downloadCheckpointDataByName(id, "META", file); if (code != 0) { stDebug("chkp failed to download meta file:%s", file); taosMemoryFree(file); return code; } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); char buf[128] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { @@ -428,7 +433,7 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { + if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId); } } @@ -457,8 +462,7 @@ int32_t uploadCheckpointData(void* param) { return code; } -int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { - // async upload +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -514,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadChkp(pTask, ckId, (char*)id); + code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } @@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } @@ -585,12 +589,12 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { stDebug("[s3] upload checkpoint:%s", filename); // break; } - taosCloseDir(&pDir); + taosCloseDir(&pDir); return 0; } -static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { +static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); @@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) { } // fileName: CURRENT -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return 0; } else if (tsS3StreamEnabled) { return downloadCheckpointByNameS3(id, fname, dstName); } + return 0; } @@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) { stError("downloadCheckpoint parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return downloadRsync(id, path); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); } + return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 891e0aa142..250866005e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t doStreamExecTask(SStreamTask* pTask) { +static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a464594233..edc1a148a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } else { stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); } @@ -1706,4 +1706,39 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { } return 0; +} + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + + streamMetaRLock(pMeta); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; + + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } + } else { + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index c76536aedf..b3df5755ea 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); } @@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index cc2c0d4e84..7853e25cff 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,6 +63,7 @@ typedef struct { bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); + bool (*noDelayFp)(tmsg_t msgType); int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f658947144..5ed2e00acd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; + pRpc->noDelayFp = pInit->noDelayFp; pRpc->connLimitNum = pInit->connLimitNum; if (pRpc->connLimitNum == 0) { pRpc->connLimitNum = 20; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4da1f04cd9..dfd7630f35 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) { transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); pMsg->ctx->task = NULL; - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; + if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { + tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), + tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + *pMsg = NULL; + return NULL; + } + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = *pMsg; arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; } else { @@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) { } } -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; STransMsg transMsg = {0}; transMsg.contLen = 0; transMsg.pCont = NULL; - transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.code = code; transMsg.msgType = pMsg->msg.msgType + 1; transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; @@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; STrans* pTransInst = pThrd->pTransInst; - + int32_t code = TSDB_CODE_RPC_MAX_SESSIONS; QUEUE_REMOVE(&pMsg->q); STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ab5d3da781..3ef656b2b4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") diff --git a/tests/army/enterprise/s3/s3Basic.py b/tests/army/enterprise/s3/s3Basic.py index 8045a3f308..9634b8edb0 100644 --- a/tests/army/enterprise/s3/s3Basic.py +++ b/tests/army/enterprise/s3/s3Basic.py @@ -103,7 +103,7 @@ class TDTestCase(TBase): loop = 0 rets = [] overCnt = 0 - while loop < 100: + while loop < 200: time.sleep(3) # check upload to s3 diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3d1e8d2250..3fca381fda 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -381,6 +381,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29793.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index b46326bb3c..00171a19a6 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -51,6 +51,8 @@ class TDSql: def init(self, cursor, log=True): self.cursor = cursor + self.sql = None + print(f"sqllog is :{log}") if (log): caller = inspect.getframeinfo(inspect.stack()[1][0]) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 56ef8c6b47..9a112c669e 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) diff --git a/tests/system-test/1-insert/test_td29793.py b/tests/system-test/1-insert/test_td29793.py new file mode 100644 index 0000000000..cdcaa244bb --- /dev/null +++ b/tests/system-test/1-insert/test_td29793.py @@ -0,0 +1,88 @@ +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.csv import * +import os +import taos +import json +from taos import SmlProtocol, SmlPrecision +from taos.error import SchemalessError + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), True) + + + def run(self): + conn = taos.connect() + + conn.execute("drop database if exists reproduce") + conn.execute("CREATE DATABASE reproduce") + conn.execute("USE reproduce") + + # influxDB + conn.execute("drop table if exists meters") + lines1 = ["meters,location=California.LosAngeles groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249000",] + lines2 = ["meters,location=California.LosAngeles,groupid=2 groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249001",] + lines3 = ["meters,location=California.LosAngeles,groupid=2 current=11i32,voltage=221,phase=0.28 1648432611249002",] + + try: + conn.schemaless_insert(lines1, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + conn.schemaless_insert(lines2, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + conn.schemaless_insert(lines3, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + + # OpenTSDB + conn.execute("drop table if exists meters") + lines1 = ["meters 1648432611249 10i32 location=California.SanFrancisco groupid=2 groupid=3",] + lines2 = ["meters 1648432611250 10i32 groupid=2 location=California.SanFrancisco groupid=3",] + + try: + conn.schemaless_insert(lines1, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + conn.schemaless_insert(lines2, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + tdSql.checkEqual('expected error', 'no error occurred') + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + # OpenTSDB Json + conn.execute("drop table if exists meters") + lines1 = [{"metric": "meters", "timestamp": 1648432611249, "value": "a32", "tags": {"location": "California.SanFrancisco", "groupid": 2, "groupid": 3}}] + lines2 = [{"metric": "meters", "timestamp": 1648432611250, "value": "a32", "tags": {"groupid": 2, "location": "California.SanFrancisco", "groupid": 4}}] + try: + lines = json.dumps(lines1) + conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + # tdSql.checkEqual('expected error', 'no error occurred') TD-29850 + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + try: + lines = json.dumps(lines2) + conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED) + # tdSql.checkEqual('expected error', 'no error occurred') TD-29850 + except SchemalessError as errMsg: + tdSql.checkEqual(errMsg.msg, 'Duplicated column names') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 422c9a2f1d..38cb1504f6 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -11,6 +11,8 @@ from util.common import * ROUND = 1000 +ignore_some_tests: int = 1 + class TSMA: def __init__(self): self.tsma_name = '' @@ -601,7 +603,7 @@ class TSMATestSQLGenerator: class TDTestCase: - updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 8} + updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3} def __init__(self): self.vgroups = 4 @@ -802,7 +804,8 @@ class TDTestCase: self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() - self.test_recursive_tsma() + if not ignore_some_tests: + self.test_recursive_tsma() self.test_query_interval_sliding() self.test_union() self.test_query_child_table() @@ -812,7 +815,8 @@ class TDTestCase: self.test_add_tag_col() self.test_modify_col_name_value() self.test_alter_tag_val() - self.test_ins_tsma() + if not ignore_some_tests: + self.test_ins_tsma() def test_ins_tsma(self): tdSql.execute('use performance_schema') @@ -862,17 +866,17 @@ class TDTestCase: .should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999') .should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.800').get_qc()) self.check(ctxs) - - tdSql.execute('create database db2') - tdSql.execute('use db2') - tdSql.execute('create table db2.norm_tb(ts timestamp, c2 int)') - tdSql.execute('insert into db2.norm_tb values(now, 1)') - tdSql.execute('insert into db2.norm_tb values(now, 2)') - self.create_tsma('tsma_db2_norm_t', 'db2', 'norm_tb', ['avg(c2)', 'last(ts)'], '10m') - sql = 'select avg(c1) as avg_c1 from test.meters union select avg(c2) from db2.norm_tb order by avg_c1' - self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('db2', 'tsma_db2_norm_t', 'norm_tb').get_qc()]) - tdSql.execute('drop database db2') - tdSql.execute('use test') + if not ignore_some_tests: + tdSql.execute('create database db2') + tdSql.execute('use db2') + tdSql.execute('create table db2.norm_tb(ts timestamp, c2 int)') + tdSql.execute('insert into db2.norm_tb values(now, 1)') + tdSql.execute('insert into db2.norm_tb values(now, 2)') + self.create_tsma('tsma_db2_norm_t', 'db2', 'norm_tb', ['avg(c2)', 'last(ts)'], '10m') + sql = 'select avg(c1) as avg_c1 from test.meters union select avg(c2) from db2.norm_tb order by avg_c1' + self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('db2', 'tsma_db2_norm_t', 'norm_tb').get_qc()]) + tdSql.execute('drop database db2') + tdSql.execute('use test') def test_modify_col_name_value(self): tdSql.error('alter table test.norm_tb rename column c1 c1_new', -2147471088) ## tsma must be dropped @@ -898,8 +902,11 @@ class TDTestCase: return result_str def test_long_tsma_name(self): + self.drop_tsma('tsma2', 'test') name = self.generate_random_string(178) - tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)', 'last(ts)'] + tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'last(ts)'] + if not ignore_some_tests: + tsma_func_list.append('hyperloglog(c2)') self.create_tsma(name, 'test', 'meters', tsma_func_list, '55m') sql = 'select last(c5), spread(c2) from test.meters interval(55m)' ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(name).get_qc() @@ -953,7 +960,9 @@ class TDTestCase: def test_recursive_tsma(self): tdSql.execute('drop tsma test.tsma2') - tsma_func_list = ['last(ts)', 'avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)'] + tsma_func_list = ['last(ts)', 'avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)'] + if not ignore_some_tests: + tsma_func_list.append('hyperloglog(c2)') select_func_list: List[str] = tsma_func_list.copy() select_func_list.append('count(*)') self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m') @@ -1388,37 +1397,39 @@ class TDTestCase: 'create tsma tsma_error_interval on nsdb.meters function(count(c2)) interval(10s,10m)') tdSql.error( 'create tsma tsma_error_interval on nsdb.meters function(count(c2)) interval(10s) sliding(1m)') - - # max tsma num 8 - self.create_tsma('tsma2', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '10s') - self.create_tsma('tsma_test3', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '100s') - self.create_tsma('tsma4', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '101s') - self.create_tsma('tsma5', 'nsdb', 'meters', ['avg(c1)', 'count(ts)'], '102s') - self.create_tsma('tsma6', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '103s') - self.create_tsma('tsma7', 'nsdb', 'meters', ['avg(c1)', 'count(c2)'], '104s') - self.create_tsma('tsma8', 'test', 'meters', ['avg(c1)', 'sum(c2)'], '105s') - tdSql.error('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(99s)', -2147482490) - tdSql.error('create recursive tsma tsma9 on test.tsma8 interval(210s)', -2147482490) - - # modify maxTsmaNum para - tdSql.error('alter dnode 1 "maxTsmaNum" "13";') - tdSql.error('alter dnode 1 "maxTsmaNum" "-1";') - - # tdSql.error('alter dnode 1 "maxTsmaNum" "abc";') - # tdSql.error('alter dnode 1 "maxTsmaNum" "1.2";') - tdSql.execute("alter dnode 1 'maxTsmaNum' '0';", queryTimes=1) - tdSql.error('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(99s)', -2147482490) - tdSql.execute("alter dnode 1 'maxTsmaNum' '12';", queryTimes=1) - tdSql.execute('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(109s)') - tdSql.execute('create tsma tsma10 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(110s)') - tdSql.execute('create tsma tsma11 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(111s)') - tdSql.execute('create tsma tsma12 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(112s)') - tdSql.query("show nsdb.tsmas", queryTimes=1) - print(tdSql.queryResult) - tdSql.query("show test.tsmas", queryTimes=1) - print(tdSql.queryResult) - tdSql.error('create tsma tsma13 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(113s)', -2147482490) + + if not ignore_some_tests: + # max tsma num 8 + self.create_tsma('tsma2', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '10s') + self.create_tsma('tsma_test3', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '100s') + self.create_tsma('tsma4', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '101s') + self.create_tsma('tsma5', 'nsdb', 'meters', ['avg(c1)', 'count(ts)'], '102s') + self.create_tsma('tsma6', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '103s') + self.create_tsma('tsma7', 'nsdb', 'meters', ['avg(c1)', 'count(c2)'], '104s') + self.create_tsma('tsma8', 'test', 'meters', ['avg(c1)', 'sum(c2)'], '105s') + tdSql.error('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(99s)', -2147482490) + tdSql.error('create recursive tsma tsma9 on test.tsma8 interval(210s)', -2147482490) + + # modify maxTsmaNum para + tdSql.error('alter dnode 1 "maxTsmaNum" "13";') + tdSql.error('alter dnode 1 "maxTsmaNum" "-1";') + + # tdSql.error('alter dnode 1 "maxTsmaNum" "abc";') + # tdSql.error('alter dnode 1 "maxTsmaNum" "1.2";') + + tdSql.execute("alter dnode 1 'maxTsmaNum' '0';", queryTimes=1) + tdSql.error('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(99s)', -2147482490) + tdSql.execute("alter dnode 1 'maxTsmaNum' '12';", queryTimes=1) + tdSql.execute('create tsma tsma9 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(109s)') + tdSql.execute('create tsma tsma10 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(110s)') + tdSql.execute('create tsma tsma11 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(111s)') + tdSql.execute('create tsma tsma12 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(112s)') + tdSql.query("show nsdb.tsmas", queryTimes=1) + print(tdSql.queryResult) + tdSql.query("show test.tsmas", queryTimes=1) + print(tdSql.queryResult) + tdSql.error('create tsma tsma13 on nsdb.meters function(count(ts), count(c1), sum(c2)) interval(113s)', -2147482490) # drop tsma @@ -1525,6 +1536,7 @@ class TDTestCase: tdSql.error("CREATE TSMA T*\-sma129_ ON test.meters FUNCTION(count(c1)) INTERVAL(5m); ", -2147473920) tdSql.error("CREATE TSMA Tsma_repeat ON test.meters FUNCTION(count(c1)) INTERVAL(5m); ", -2147482496) + self.drop_tsma('tsma_repeat', 'test') # tsma name include escape character tdSql.execute("CREATE TSMA `129_tsma` ON test.meters FUNCTION(count(c3)) INTERVAL(5m); ") @@ -1534,9 +1546,6 @@ class TDTestCase: tdSql.execute("drop tsma test.`129_Tsma`") tdSql.execute("drop tsma test.`129_T*\-sma`") - self.drop_tsma('tsma_repeat', 'test') - - def test_create_and_drop_tsma(self, tsma_name: str, db_name: str = 'test', table_name: str = 'meters', func_list: List = ['avg(c1)', 'avg(c2)'], interval: str = '5m'): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') diff --git a/tests/system-test/7-tmq/ins_topics_test.py b/tests/system-test/7-tmq/ins_topics_test.py index 8bf0a7e91a..1628e51166 100644 --- a/tests/system-test/7-tmq/ins_topics_test.py +++ b/tests/system-test/7-tmq/ins_topics_test.py @@ -38,6 +38,14 @@ class TDTestCase: tdSql.execute("create topic topic_2 with meta as stable db1.st") tdSql.execute("create topic topic_3 as select * from db1.nt") tdSql.execute("create topic topic_4 as select ts,c3,c5,t2 from db1.st") + for i in range(5, 21): + tdSql.execute(f"create topic topic_{i} as select ts,c3,c5,t2 from db1.st") + + tdSql.error("create topic topic_21 as select * from db1.nt") + tdSql.execute("create topic if not exists topic_1 as database db1") + for i in range(5, 21): + tdSql.execute(f"drop topic topic_{i}") + tdSql.query("select * from information_schema.ins_topics order by topic_name") tdSql.checkRows(4)