diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index 42a9f3af5a..f833dbf439 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -23,20 +23,30 @@ By subscribing to a topic, a consumer can obtain the latest data in that topic i To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers. -Tips:(c interface for example) -1. A consumption group consumes all data under the same topic, and different consumption groups are independent of each other; -2. A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data; -3. On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups; -4. Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through tmq_get_vgroup_offset. The offset interface obtains the offset of the first record in the block; -5. If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data; -6. If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready; -7. The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for; -8. Consumers can tmq_position to obtain the offset of the current consumption, seek to the specified offset, and consume again; -9. Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data; -10. Before the seek operation, tmq must be call tmq_get_topic_assignment, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported; -11. Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number; -12. The tmq_get_vgroup_offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four; -13. Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products. +The following are some explanations about data subscription, which require some understanding of the architecture of TDengine and the use of various language linker interfaces. +- A consumption group consumes all data under the same topic, and different consumption groups are independent of each other; +- A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data; +- On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups; +- Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through offset interface. The offset interface obtains the offset of the first record in the block; +- If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data; +- If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready; +- The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for; +- Consumers can call position interface to obtain the offset of the current consumption, seek to the specified offset, and consume again; +- Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data; +- Position is to obtain the current consumption position, which is the position to be taken next time, not the current consumption position +- Commit is the submission of the consumption location. Without parameters, it is the submission of the current consumption location (the location to be taken next time, not the current consumption location). With parameters, it is the location in the submission parameters (i.e. the location to be taken after the next exit and restart) +- Seek is to set the consumer's consumption position. Wherever the seek goes, the position will be returned, all of which are the positions to be taken next time +- Seek does not affect commit, commit does not affect seek, independent of each other, the two are different concepts +- The begin interface is the offset of the first data in wal, and the end interface is the offset+1 of the last data in wal10. +- Before the seek operation, tmq must be call assignment interface, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported; +- Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number; +- The offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four; +- Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products. + +This document does not provide any further introduction to the knowledge of message queues themselves. If you need to know more, please search for it yourself. + +Starting from version 3.2.0.0, data subscription supports vnode migration and splitting. +Due to the dependence of data subscription on wal files, wal does not synchronize during vnode migration and splitting. Therefore, after migration or splitting, wal data that has not been consumed before cannot be consumed. So please ensure that all data has been consumed before proceeding with vnode migration or splitting, otherwise data loss may occur during consumption. ## Data Schema and API diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index a73d43cd04..927d762829 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -23,22 +23,30 @@ import CDemo from "./_sub_c.mdx"; 为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 -本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 +下面为关于数据订阅的一些说明,需要对TDengine的架构有一些了解,结合各个语言链接器的接口使用。 +- 一个消费组消费同一个topic下的所有数据,不同消费组之间相互独立; +- 一个消费组消费同一个topic所有的vgroup,消费组可由多个消费者组成,但一个vgroup仅被一个消费者消费,如果消费者数量超过了vgroup数量,多余的消费者不消费数据; +- 在服务端每个vgroup仅保存一个offset,每个vgroup的offset是单调递增的,但不一定连续。各个vgroup的offset之间没有关联; +- 每次poll服务端会返回一个结果block,该block属于一个vgroup,可能包含多个wal版本的数据,可以通过 offset 接口获得是该block第一条记录的offset; +- 一个消费组如果从未commit过offset,当其成员消费者重启重新拉取数据时,均从参数auto.offset.reset设定值开始消费;在一个消费者生命周期中,客户端本地记录了最近一次拉取数据的offset,不会拉取重复数据; +- 消费者如果异常终止(没有调用tmq_close),需等约12秒后触发其所属消费组rebalance,该消费者在服务端状态变为LOST,约1天后该消费者自动被删除;正常退出,退出后就会删除消费者;新增消费者,需等约2秒触发rebalance,该消费者在服务端状态变为ready; +- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配,消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作; +- 消费者可利用 position 获得当前消费的offset,并seek到指定offset,重新消费; +- seek将position指向指定offset,不执行commit操作,一旦seek成功,可poll拉取指定offset及以后的数据; +- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法,如非法将报错; +- position是获取当前的消费位置,是下次要取的位置,不是当前消费到的位置 +- commit是提交消费位置,不带参数的话,是提交当前消费位置(下次要取的位置,不是当前消费到的位置),带参数的话,是提交参数里的位置(也即下次退出重启后要取的位置) +- seek是设置consumer消费位置,seek到哪,position就返回哪,都是下次要取的位置 +- seek不会影响commit,commit不影响seek,相互独立,两个是不同的概念 +- begin接口为wal 第一条数据的offset,end 接口为wal 最后一条数据的offset + 1 +- offset接口获取的是记录所在结果block块里的第一条数据的offset,当seek至该offset时,将消费到这个block里的全部数据。参见第四点; +- 由于存在 WAL 过期删除机制,即使seek 操作成功,poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号,将会从WAL最小版本号消费; +- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似; -说明(以c接口为例): -1. 一个消费组消费同一个topic下的所有数据,不同消费组之间相互独立; -2. 一个消费组消费同一个topic所有的vgroup,消费组可由多个消费者组成,但一个vgroup仅被一个消费者消费,如果消费者数量超过了vgroup数量,多余的消费者不消费数据; -3. 在服务端每个vgroup仅保存一个offset,每个vgroup的offset是单调递增的,但不一定连续。各个vgroup的offset之间没有关联; -4. 每次poll服务端会返回一个结果block,该block属于一个vgroup,可能包含多个wal版本的数据,可以通过 tmq_get_vgroup_offset 接口获得是该block第一条记录的offset; -5. 一个消费组如果从未commit过offset,当其成员消费者重启重新拉取数据时,均从参数auto.offset.reset设定值开始消费;在一个消费者生命周期中,客户端本地记录了最近一次拉取数据的offset,不会拉取重复数据; -6. 消费者如果异常终止(没有调用tmq_close),需等约12秒后触发其所属消费组rebalance,该消费者在服务端状态变为LOST,约1天后该消费者自动被删除;正常退出,退出后就会删除消费者;新增消费者,需等约2秒触发rebalance,该消费者在服务端状态变为ready; -7. 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配,消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作; -8. 消费者可利用 tmq_position 获得当前消费的offset,并seek到指定offset,重新消费; -9. seek将position指向指定offset,不执行commit操作,一旦seek成功,可poll拉取指定offset及以后的数据; -10. seek 操作之前须调用 tmq_get_topic_assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法,如非法将报错; -11. tmq_get_vgroup_offset接口获取的是记录所在结果block块里的第一条数据的offset,当seek至该offset时,将消费到这个block里的全部数据。参见第四点; -12. 由于存在 WAL 过期删除机制,即使seek 操作成功,poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号,将会从WAL最小版本号消费; -13. 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似; +本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索。 + +从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。 +由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。 ## 主要数据结构和 API diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 03285edcf7..791fe34440 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int32_t tsS3BlockSize = 4096; // number of tsdb pages +int32_t tsS3BlockCacheSize = 16; // number of blocks + int32_t tsCheckpointInterval = 20; #ifndef _STORAGE @@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { return 0; } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -655,6 +660,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; // min free disk space used to check if the disk is full [50MB, 1GB] if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, @@ -1070,6 +1077,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32; tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64; + tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; + tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + GRANT_CFG_GET; return 0; } @@ -1643,6 +1653,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "s3BlockSize") == 0) { + int32_t newS3BlockSize = atoi(value); + uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize); + tsS3BlockSize = newS3BlockSize; + return; + } + + if (strcasecmp(option, "s3BlockCacheSize") == 0) { + int32_t newS3BlockCacheSize = atoi(value); + uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize); + tsS3BlockCacheSize = newS3BlockCacheSize; + return; + } + if (strcasecmp(option, "keepTimeOffset") == 0) { int32_t newKeepTimeOffset = atoi(value); uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab6a7fb88b..edcce83a05 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -380,6 +380,8 @@ struct STsdb { TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; + SLRUCache *bCache; + TdThreadMutex bMutex; struct STFileSystem *pFS; // new SRocksCache rCache; }; @@ -643,13 +645,19 @@ struct SRowMerger { }; typedef struct { - char *path; - int32_t szPage; - int32_t flag; - TdFilePtr pFD; - int64_t pgno; - uint8_t *pBuf; - int64_t szFile; + char *path; + int32_t szPage; + int32_t flag; + TdFilePtr pFD; + int64_t pgno; + uint8_t *pBuf; + int64_t szFile; + STsdb *pTsdb; + const char *objName; + uint8_t s3File; + int32_t fid; + int64_t cid; + int64_t blkno; } STsdbFD; struct SDelFWriter { @@ -716,9 +724,9 @@ typedef struct SSttBlockLoadCostInfo { } SSttBlockLoadCostInfo; typedef struct SSttBlockLoadInfo { - SBlockData blockData[2]; // buffered block data - int32_t statisBlockIndex; // buffered statistics block index - void *statisBlock; // buffered statistics block data + SBlockData blockData[2]; // buffered block data + int32_t statisBlockIndex; // buffered statistics block index + void *statisBlock; // buffered statistics block data void *pSttStatisBlkArray; SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. @@ -861,6 +869,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); +int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h); + int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 51d214518a..bb4d284f0e 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -22,15 +22,21 @@ extern "C" { #endif -extern int8_t tsS3Enabled; +#define S3_BLOCK_CACHE + +extern int8_t tsS3Enabled; +extern int32_t tsS3BlockSize; +extern int32_t tsS3BlockCacheSize; int32_t s3Init(); void s3CleanUp(); int32_t s3PutObjectFromFile(const char *file, const char *object); +int32_t s3PutObjectFromFile2(const char *file, const char *object); void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 89bdc085a3..38fbf42915 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -16,6 +16,7 @@ #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" +#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) @@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +static int32_t tsdbOpenBCache(STsdb *pTsdb) { + int32_t code = 0; + // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + + SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->bMutex, NULL); + +_err: + pTsdb->bCache = pCache; + return code; +} + +static void tsdbCloseBCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->bCache; + if (pCache) { + int32_t elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + taosLRUCacheEraseUnrefEntries(pCache); + elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->bMutex); + } +} + #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) typedef struct { @@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenBCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbOpenRocksCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) { } tsdbCloseBICache(pTsdb); + tsdbCloseBCache(pTsdb); tsdbCloseRocksCache(pTsdb); } @@ -2987,3 +3030,100 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { return code; } + +// block cache +static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) { + struct { + int32_t fid; + int64_t commitID; + int64_t blkno; + } bKey = {0}; + + bKey.fid = fid; + bKey.commitID = commitID; + bKey.blkno = blkno; + + *len = sizeof(bKey); + memcpy(key, &bKey, *len); +} + +static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { + int32_t code = 0; + /* + uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); + if (pBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + */ + int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + // int64_t size = 4096; + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); + if (code != TSDB_CODE_SUCCESS) { + // taosMemoryFree(pBlock); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + //*ppBlock = pBlock; + + tsdbTrace("block:%p load from s3", *ppBlock); + +_exit: + return code; +} + +static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) { + (void)ud; + uint8_t *pBlock = (uint8_t *)value; + + taosMemoryFree(pBlock); +} + +int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + + getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STsdb *pTsdb = pFD->pTsdb; + taosThreadMutexLock(&pTsdb->bMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + uint8_t *pBlock = NULL; + code = tsdbCacheLoadBlockS3(pFD, &pBlock); + // if table's empty or error, return code of -1 + if (code != TSDB_CODE_SUCCESS || pBlock == NULL) { + taosThreadMutexUnlock(&pTsdb->bMutex); + + *handle = NULL; + return 0; + } + + size_t charge = tsS3BlockSize * pFD->szPage; + _taos_lru_deleter_t deleter = deleteBCache; + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->bMutex); + } + + *handle = h; + + return code; +} + +int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 7e5eb2c553..6e4cb517ff 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (fname) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (fname[i]) { - code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (config->files[i].exist) { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, &config->files[i].file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1466,7 +1466,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { } tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); if (writer->files[ftype].size == 0) { @@ -1634,7 +1634,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -1684,4 +1684,4 @@ _exit: TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index e768f68b15..da2445dee5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -31,7 +31,7 @@ typedef struct SFDataPtr { int64_t size; } SFDataPtr; -extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); +extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD); extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size); @@ -41,4 +41,4 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD); } #endif -#endif /*_TD_TSDB_DEF_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_DEF_H_*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 974b7f1b76..c143bb8a72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { const char *object_name = taosDirEntryBaseName((char *)path); long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) { +#ifndef S3_BLOCK_CACHE s3EvictCache(path, s3_size); s3Get(object_name, path); @@ -38,6 +39,14 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { // taosMemoryFree(pFD); goto _exit; } +#else + pFD->s3File = 1; + pFD->pFD = (TdFilePtr)&pFD->s3File; + int32_t vid = 0; + sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid); + pFD->objName = object_name; + // pFD->szFile = s3_size; +#endif } else { code = TAOS_SYSTEM_ERROR(errsv); // taosMemoryFree(pFD); @@ -72,9 +81,10 @@ _exit: } // =============== PAGE-WISE FILE =============== -int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { +int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; STsdbFD *pFD = NULL; + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; *ppFD = NULL; @@ -90,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p pFD->flag = flag; pFD->szPage = szPage; pFD->pgno = 0; + pFD->pTsdb = pTsdb; *ppFD = pFD; @@ -101,7 +112,9 @@ void tsdbCloseFile(STsdbFD **ppFD) { STsdbFD *pFD = *ppFD; if (pFD) { taosMemoryFree(pFD->pBuf); - taosCloseFile(&pFD->pFD); + if (!pFD->s3File) { + taosCloseFile(&pFD->pFD); + } taosMemoryFree(pFD); *ppFD = NULL; } @@ -153,22 +166,41 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { } } - // seek int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); - int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - // read - n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } else if (n < pFD->szPage) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; + if (pFD->s3File) { + LRUHandle *handle = NULL; + + pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize; + int32_t code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle); + if (code != TSDB_CODE_SUCCESS || handle == NULL) { + tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + goto _exit; + } + + uint8_t *pBlock = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->bCache, handle); + + int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage); + + tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + } else { + // seek + int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + // read + n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < pFD->szPage) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } } // check @@ -293,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS // head flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE); @@ -307,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD); if (code) goto _err; if (pWriter->fData.size == 0) { code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE); @@ -322,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD); if (code) goto _err; if (pWriter->fSma.size == 0) { code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); @@ -335,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0); flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; @@ -907,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS // head tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD); TSDB_CHECK_CODE(code, lino, _exit); // data tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD); TSDB_CHECK_CODE(code, lino, _exit); // sma tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD); TSDB_CHECK_CODE(code, lino, _exit); // stt for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1323,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb pDelFWriter->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, - &pDelFWriter->pWriteH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); TSDB_CHECK_CODE(code, lino, _exit); // update header @@ -1498,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb pDelFReader->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH); if (code) { taosMemoryFree(pDelFReader); goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 61be14f9bc..cb53876d97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -114,7 +114,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile TSDB_CHECK_CODE(code, lino, _exit); char *object_name = taosDirEntryBaseName(fname); - code = s3PutObjectFromFile(from->fname, object_name); + code = s3PutObjectFromFile2(from->fname, object_name); TSDB_CHECK_CODE(code, lino, _exit); taosCloseFile(&fdFrom); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 27fae9dc6e..fa8d2d5ba4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con // open file if (fname) { - code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } else { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, config->file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } @@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { char fname[TSDB_FILENAME_LEN]; tsdbTFileName(writer->config->tsdb, writer->file, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -984,4 +984,4 @@ _exit: return code; } -bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } \ No newline at end of file +bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 3b38a0ae45..0884c32385 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * char fname[TSDB_FILENAME_LEN]; tsdbTFileName(tsdb, &file, fname); - code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); // convert @@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade code = tsdbTFileObjInit(tsdb, &file, &fobj); TSDB_CHECK_CODE(code, lino, _exit1); - code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit1); for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) { @@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f } char fname[TSDB_FILENAME_LEN] = {0}; - code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize, - TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); + code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -633,4 +632,4 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { taosRemoveFile(fname); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 7e95a55077..e6c3b87e94 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -59,17 +59,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { cos_request_options_t *options = NULL; cos_string_t bucket, object, file; cos_table_t *resp_headers; - int traffic_limit = 0; + // int traffic_limit = 0; cos_pool_create(&p, NULL); options = cos_request_options_create(p); s3InitRequestOptions(options, is_cname); cos_table_t *headers = NULL; + /* if (traffic_limit) { // 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误 headers = cos_table_make(p, 1); cos_table_add_int(headers, "x-cos-traffic-limit", 819200); } + */ cos_str_set(&bucket, tsS3BucketName); cos_str_set(&file, file_str); cos_str_set(&object, object_str); @@ -85,6 +87,48 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { return code; } +int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { + int32_t code = 0; + cos_pool_t *p = NULL; + int is_cname = 0; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; + cos_string_t bucket, object, file; + cos_table_t *resp_headers; + int traffic_limit = 0; + cos_table_t *headers = NULL; + cos_resumable_clt_params_t *clt_params = NULL; + + cos_pool_create(&p, NULL); + options = cos_request_options_create(p); + s3InitRequestOptions(options, is_cname); + headers = cos_table_make(p, 0); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&file, file_str); + cos_str_set(&object, object_str); + + // upload + clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL); + s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL); + + if (!cos_status_is_ok(s)) { + vError("s3: %s", s->error_msg); + vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + code = terrno; + return code; + } + + log_status(s); + + cos_pool_destroy(p); + + if (s->code != 200) { + return code = s->code; + } + + return code; +} + void s3DeleteObjectsByPrefix(const char *prefix_str) { cos_pool_t *p = NULL; cos_request_options_t *options = NULL; @@ -217,6 +261,77 @@ bool s3Get(const char *object_name, const char *path) { return ret; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { + int32_t code = 0; + cos_pool_t *p = NULL; + int is_cname = 0; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; + cos_string_t bucket; + cos_string_t object; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; + cos_buf_t *content = NULL; + // cos_string_t file; + // int traffic_limit = 0; + char range_buf[64]; + + //创建内存池 + cos_pool_create(&p, NULL); + + //初始化请求选项 + options = cos_request_options_create(p); + // init_test_request_options(options, is_cname); + s3InitRequestOptions(options, is_cname); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&object, object_name); + cos_list_t download_buffer; + cos_list_init(&download_buffer); + /* + if (traffic_limit) { + // 限速值设置范围为819200 - 838860800,单位默认为 bit/s,即800Kb/s - 800Mb/s,如果超出该范围将返回400错误 + headers = cos_table_make(p, 1); + cos_table_add_int(headers, "x-cos-traffic-limit", 819200); + } + */ + + headers = cos_table_create_if_null(options, headers, 1); + apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset, + offset + block_size - 1); + apr_table_add(headers, COS_RANGE, range_buf); + + s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers); + if (!cos_status_is_ok(s)) { + vError("s3: %s", s->error_msg); + vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + code = terrno; + return code; + } + + log_status(s); + // print_headers(resp_headers); + int64_t len = 0; + int64_t size = 0; + int64_t pos = 0; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); } + // char *buf = cos_pcalloc(p, (apr_size_t)(len + 1)); + char *buf = taosMemoryCalloc(1, (apr_size_t)(len)); + // buf[len] = '\0'; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { + size = cos_buf_size(content); + memcpy(buf + pos, content->pos, (size_t)size); + pos += size; + } + // cos_warn_log("Download data=%s", buf); + + //销毁内存池 + cos_pool_destroy(p); + + *ppBlock = buf; + + return code; +} + typedef struct { int64_t size; int32_t atime; @@ -333,10 +448,12 @@ long s3Size(const char *object_name) { int32_t s3Init() { return 0; } void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } +int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; } void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 44d39392a2..6c537d7b98 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -75,7 +75,7 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32 double v = 0; GET_TYPED_DATA(v, double, pVar->nType, &pVar->d); colDataSetVal(pDst, rowIndex, (char*)&v, isNull); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type) || pDst->info.type == TSDB_DATA_TYPE_BOOL) { int64_t v = 0; GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); colDataSetVal(pDst, rowIndex, (char*)&v, isNull); @@ -85,7 +85,10 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32 colDataSetVal(pDst, rowIndex, (char*)&v, isNull); } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataSetVal(pDst, rowIndex, (const char*)¤tKey, isNull); - } else { // varchar/nchar data + } else if (pDst->info.type == TSDB_DATA_TYPE_NCHAR || pDst->info.type == TSDB_DATA_TYPE_VARCHAR || + pDst->info.type == TSDB_DATA_TYPE_VARBINARY) { + colDataSetVal(pDst, rowIndex, pVar->pz, isNull); + } else { // others data colDataSetNULL(pDst, rowIndex); } } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c6432092d3..9ae6ab8cc4 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -338,6 +338,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hyperloglog.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/irate.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/irate.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join.py @@ -569,6 +570,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 2 @@ -698,6 +700,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 3 @@ -794,6 +797,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -R diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py new file mode 100644 index 0000000000..f5cd2d5855 --- /dev/null +++ b/tests/system-test/2-query/fill.py @@ -0,0 +1,262 @@ +import taos +import sys + +from util.log import * +from util.sql import * +from util.cases import * + + + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + dbname = "db" + tbname = "tb" + + tdSql.prepare() + + tdLog.printNoPrefix("==========step1:create table") + + tdSql.execute( + f'''create table if not exists {dbname}.{tbname} + (ts timestamp, c0 int, c1 bool, c2 varchar(100), c3 nchar(100), c4 varbinary(100)) + ''' + ) + + tdLog.printNoPrefix("==========step2:insert data") + + tdSql.execute(f"use db") + + tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:05', 5, true, 'varchar', 'nchar', 'varbinary')") + tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:10', 10, true, 'varchar', 'nchar', 'varbinary')") + tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:15', 15, NULL, NULL, NULL, NULL)") + + tdLog.printNoPrefix("==========step3:fill data") + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, 'xx');") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 9) + tdSql.checkData(1, 0, 5) + tdSql.checkData(2, 0, 10) + tdSql.checkData(3, 0, 15) + tdSql.checkData(4, 0, 9) + tdSql.checkData(0, 1, False) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, False) + tdSql.checkData(4, 1, False) + + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, True);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, True) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, True) + tdSql.checkData(4, 1, True) + + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, False);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, False) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, False) + tdSql.checkData(4, 1, False) + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, 'abc');") + tdSql.checkRows(5) + tdSql.checkData(0, 1, "abc") + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, "abc") + tdSql.checkData(4, 1, "abc") + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是#$^中文');") + tdSql.checkRows(5) + tdSql.checkData(0, 1, '我是#$^中文') + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, "我是#$^中文") + tdSql.checkData(4, 1, "我是#$^中文") + + tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是#$^中文');") + tdSql.checkRows(5) + tdSql.checkData(0, 1, '我是#$^中文') + tdSql.checkData(1, 1, 'nchar') + tdSql.checkData(2, 1, 'nchar') + tdSql.checkData(3, 1, "我是#$^中文") + tdSql.checkData(4, 1, "我是#$^中文") + + tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是中文');") + tdSql.checkRows(5) + tdSql.checkData(0, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87') + tdSql.checkData(1, 1, b'varbinary') + tdSql.checkData(2, 1, b'varbinary') + tdSql.checkData(3, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87') + tdSql.checkData(4, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87') + + tdLog.printNoPrefix("==========step4:fill null") + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, NULL, NULL);") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, 5) + tdSql.checkData(2, 0, 10) + tdSql.checkData(3, 0, 15) + tdSql.checkData(4, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, 'nchar') + tdSql.checkData(2, 1, 'nchar') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, b'varbinary') + tdSql.checkData(2, 1, b'varbinary') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdLog.printNoPrefix("==========step5:fill prev") + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, 5) + tdSql.checkData(2, 0, 10) + tdSql.checkData(3, 0, 15) + tdSql.checkData(4, 0, 15) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, True) + tdSql.checkData(4, 1, True) + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, 'varchar') + tdSql.checkData(4, 1, 'varchar') + + tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, 'nchar') + tdSql.checkData(2, 1, 'nchar') + tdSql.checkData(3, 1, 'nchar') + tdSql.checkData(4, 1, 'nchar') + + tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 1, b'varbinary') + tdSql.checkData(2, 1, b'varbinary') + tdSql.checkData(3, 1, b'varbinary') + tdSql.checkData(4, 1, b'varbinary') + + tdLog.printNoPrefix("==========step6:fill next") + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 5) + tdSql.checkData(1, 0, 5) + tdSql.checkData(2, 0, 10) + tdSql.checkData(3, 0, 15) + tdSql.checkData(4, 0, None) + tdSql.checkData(0, 1, True) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 'varchar') + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 'nchar') + tdSql.checkData(1, 1, 'nchar') + tdSql.checkData(2, 1, 'nchar') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, b'varbinary') + tdSql.checkData(1, 1, b'varbinary') + tdSql.checkData(2, 1, b'varbinary') + tdSql.checkData(3, 1, None) + tdSql.checkData(4, 1, None) + + tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:20', 15, False, '中文', '中文', '中文');") + tdLog.printNoPrefix("==========step6:fill next") + tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, True) + tdSql.checkData(1, 1, True) + tdSql.checkData(2, 1, True) + tdSql.checkData(3, 1, False) + tdSql.checkData(4, 1, False) + + tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 'varchar') + tdSql.checkData(1, 1, 'varchar') + tdSql.checkData(2, 1, 'varchar') + tdSql.checkData(3, 1, '中文') + tdSql.checkData(4, 1, '中文') + + tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 'nchar') + tdSql.checkData(1, 1, 'nchar') + tdSql.checkData(2, 1, 'nchar') + tdSql.checkData(3, 1, '中文') + tdSql.checkData(4, 1, '中文') + + tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, b'varbinary') + tdSql.checkData(1, 1, b'varbinary') + tdSql.checkData(2, 1, b'varbinary') + tdSql.checkData(3, 1, b'\xe4\xb8\xad\xe6\x96\x87') + tdSql.checkData(4, 1, b'\xe4\xb8\xad\xe6\x96\x87') + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index c2eb7bee2e..86d010209d 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -375,6 +375,22 @@ class TDTestCase: tdSql.checkData(11, 0, True) tdSql.checkData(12, 0, True) + tdSql.query(f"select interp(c6) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(value, false)") + tdSql.checkRows(13) + tdSql.checkData(0, 0, False) + tdSql.checkData(1, 0, True) + tdSql.checkData(2, 0, False) + tdSql.checkData(3, 0, False) + tdSql.checkData(4, 0, False) + tdSql.checkData(5, 0, False) + tdSql.checkData(6, 0, True) + tdSql.checkData(7, 0, False) + tdSql.checkData(8, 0, False) + tdSql.checkData(9, 0, False) + tdSql.checkData(10, 0, False) + tdSql.checkData(11, 0, True) + tdSql.checkData(12, 0, False) + tdSql.query(f"select interp(c6) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(value, NULL)") tdSql.checkRows(13) tdSql.checkData(0, 0, None) diff --git a/tests/system-test/2-query/limit.py b/tests/system-test/2-query/limit.py index 4774602d69..fb5595a8be 100644 --- a/tests/system-test/2-query/limit.py +++ b/tests/system-test/2-query/limit.py @@ -291,8 +291,8 @@ class TDTestCase: tdSql.checkData(0, 7, 1) tdSql.checkData(0, 8, "binary5") tdSql.checkData(0, 9, "nchar5") - tdSql.checkData(1, 8, None) - tdSql.checkData(1, 9, None) + tdSql.checkData(1, 8, "-8") + tdSql.checkData(1, 9, "-9") limit = paraDict["rowsPerTbl"] diff --git a/tests/system-test/runAllOne.sh b/tests/system-test/runAllOne.sh index a870e36935..6d4c80c388 100644 --- a/tests/system-test/runAllOne.sh +++ b/tests/system-test/runAllOne.sh @@ -210,6 +210,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -P python3 ./test.py -f 2-query/hyperloglog.py -P -R python3 ./test.py -f 2-query/interp.py -P python3 ./test.py -f 2-query/interp.py -P -R +python3 ./test.py -f 2-query/fill.py -P python3 ./test.py -f 2-query/irate.py -P python3 ./test.py -f 2-query/irate.py -P -R python3 ./test.py -f 2-query/join.py -P @@ -450,6 +451,7 @@ python3 ./test.py -f 2-query/arccos.py -P -Q 2 python3 ./test.py -f 2-query/arctan.py -P -Q 2 python3 ./test.py -f 2-query/query_cols_tags_and_or.py -P -Q 2 python3 ./test.py -f 2-query/interp.py -P -Q 2 +python3 ./test.py -f 2-query/fill.py -P -Q 2 python3 ./test.py -f 2-query/nestedQueryInterval.py -P -Q 2 python3 ./test.py -f 2-query/stablity.py -P -Q 2 python3 ./test.py -f 2-query/stablity_1.py -P -Q 2 @@ -579,6 +581,7 @@ python3 ./test.py -f 2-query/last_row.py -P -Q 3 python3 ./test.py -f 2-query/tsbsQuery.py -P -Q 3 python3 ./test.py -f 2-query/sml.py -P -Q 3 python3 ./test.py -f 2-query/interp.py -P -Q 3 +python3 ./test.py -f 2-query/fill.py -P -Q 3 python3 ./test.py -f 2-query/case_when.py -P -Q 3 python3 ./test.py -f 2-query/blockSMA.py -P -Q 3 python3 ./test.py -f 2-query/projectionDesc.py -P -Q 3 @@ -675,6 +678,7 @@ python3 ./test.py -f 2-query/last_row.py -P -Q 4 python3 ./test.py -f 2-query/tsbsQuery.py -P -Q 4 python3 ./test.py -f 2-query/sml.py -P -Q 4 python3 ./test.py -f 2-query/interp.py -P -Q 4 +python3 ./test.py -f 2-query/fill.py -P -Q 4 python3 ./test.py -f 2-query/case_when.py -P -Q 4 python3 ./test.py -f 2-query/insert_select.py -P python3 ./test.py -f 2-query/insert_select.py -P -R diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 443c27fd7e..3daf65b406 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -271,6 +271,7 @@ python3 ./test.py -f 2-query/hyperloglog.py python3 ./test.py -f 2-query/hyperloglog.py -R python3 ./test.py -f 2-query/interp.py python3 ./test.py -f 2-query/interp.py -R +python3 ./test.py -f 2-query/fill.py python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/irate.py -R python3 ./test.py -f 2-query/join.py @@ -496,6 +497,7 @@ python3 ./test.py -f 2-query/arccos.py -Q 2 python3 ./test.py -f 2-query/arctan.py -Q 2 python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2 python3 ./test.py -f 2-query/interp.py -Q 2 +python3 ./test.py -f 2-query/fill.py -Q 2 python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 2 python3 ./test.py -f 2-query/stablity.py -Q 2 python3 ./test.py -f 2-query/stablity_1.py -Q 2 @@ -624,6 +626,7 @@ python3 ./test.py -f 2-query/last_row.py -Q 3 python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 python3 ./test.py -f 2-query/sml.py -Q 3 python3 ./test.py -f 2-query/interp.py -Q 3 +python3 ./test.py -f 2-query/fill.py -Q 3 python3 ./test.py -f 2-query/case_when.py -Q 3 python3 ./test.py -f 2-query/blockSMA.py -Q 3 python3 ./test.py -f 2-query/projectionDesc.py -Q 3 @@ -717,6 +720,7 @@ python3 ./test.py -f 2-query/last_row.py -Q 4 python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 python3 ./test.py -f 2-query/sml.py -Q 4 python3 ./test.py -f 2-query/interp.py -Q 4 +python3 ./test.py -f 2-query/fill.py -Q 4 python3 ./test.py -f 2-query/case_when.py -Q 4 python3 ./test.py -f 2-query/insert_select.py python3 ./test.py -f 2-query/insert_select.py -R