Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-26174

This commit is contained in:
liuyao 2023-09-22 19:25:41 +08:00
commit 0d983e28ef
20 changed files with 729 additions and 90 deletions

View File

@ -23,20 +23,30 @@ By subscribing to a topic, a consumer can obtain the latest data in that topic i
To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
Tips:(c interface for example)
1. A consumption group consumes all data under the same topic, and different consumption groups are independent of each other;
2. A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data;
3. On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups;
4. Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through tmq_get_vgroup_offset. The offset interface obtains the offset of the first record in the block;
5. If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data;
6. If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready;
7. The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for;
8. Consumers can tmq_position to obtain the offset of the current consumption, seek to the specified offset, and consume again;
9. Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data;
10. Before the seek operation, tmq must be call tmq_get_topic_assignment, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported;
11. Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number;
12. The tmq_get_vgroup_offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four;
13. Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
The following are some explanations about data subscription, which require some understanding of the architecture of TDengine and the use of various language linker interfaces.
- A consumption group consumes all data under the same topic, and different consumption groups are independent of each other;
- A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data;
- On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups;
- Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through offset interface. The offset interface obtains the offset of the first record in the block;
- If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data;
- If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready;
- The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for;
- Consumers can call position interface to obtain the offset of the current consumption, seek to the specified offset, and consume again;
- Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data;
- Position is to obtain the current consumption position, which is the position to be taken next time, not the current consumption position
- Commit is the submission of the consumption location. Without parameters, it is the submission of the current consumption location (the location to be taken next time, not the current consumption location). With parameters, it is the location in the submission parameters (i.e. the location to be taken after the next exit and restart)
- Seek is to set the consumer's consumption position. Wherever the seek goes, the position will be returned, all of which are the positions to be taken next time
- Seek does not affect commit, commit does not affect seek, independent of each other, the two are different concepts
- The begin interface is the offset of the first data in wal, and the end interface is the offset+1 of the last data in wal10.
- Before the seek operation, tmq must be call assignment interface, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported;
- Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number;
- The offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four;
- Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
This document does not provide any further introduction to the knowledge of message queues themselves. If you need to know more, please search for it yourself.
Starting from version 3.2.0.0, data subscription supports vnode migration and splitting.
Due to the dependence of data subscription on wal files, wal does not synchronize during vnode migration and splitting. Therefore, after migration or splitting, wal data that has not been consumed before cannot be consumed. So please ensure that all data has been consumed before proceeding with vnode migration or splitting, otherwise data loss may occur during consumption.
## Data Schema and API

View File

@ -23,22 +23,30 @@ import CDemo from "./_sub_c.mdx";
为了实现上述功能TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
下面为关于数据订阅的一些说明需要对TDengine的架构有一些了解结合各个语言链接器的接口使用。
- 一个消费组消费同一个topic下的所有数据不同消费组之间相互独立
- 一个消费组消费同一个topic所有的vgroup消费组可由多个消费者组成但一个vgroup仅被一个消费者消费如果消费者数量超过了vgroup数量多余的消费者不消费数据
- 在服务端每个vgroup仅保存一个offset每个vgroup的offset是单调递增的但不一定连续。各个vgroup的offset之间没有关联
- 每次poll服务端会返回一个结果block该block属于一个vgroup可能包含多个wal版本的数据可以通过 offset 接口获得是该block第一条记录的offset
- 一个消费组如果从未commit过offset当其成员消费者重启重新拉取数据时均从参数auto.offset.reset设定值开始消费在一个消费者生命周期中客户端本地记录了最近一次拉取数据的offset不会拉取重复数据
- 消费者如果异常终止没有调用tmq_close需等约12秒后触发其所属消费组rebalance该消费者在服务端状态变为LOST约1天后该消费者自动被删除正常退出退出后就会删除消费者新增消费者需等约2秒触发rebalance该消费者在服务端状态变为ready
- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作
- 消费者可利用 position 获得当前消费的offset并seek到指定offset重新消费
- seek将position指向指定offset不执行commit操作一旦seek成功可poll拉取指定offset及以后的数据
- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法如非法将报错
- position是获取当前的消费位置是下次要取的位置不是当前消费到的位置
- commit是提交消费位置不带参数的话是提交当前消费位置下次要取的位置不是当前消费到的位置带参数的话是提交参数里的位置也即下次退出重启后要取的位置
- seek是设置consumer消费位置seek到哪position就返回哪都是下次要取的位置
- seek不会影响commitcommit不影响seek相互独立两个是不同的概念
- begin接口为wal 第一条数据的offsetend 接口为wal 最后一条数据的offset + 1
- offset接口获取的是记录所在结果block块里的第一条数据的offset当seek至该offset时将消费到这个block里的全部数据。参见第四点
- 由于存在 WAL 过期删除机制即使seek 操作成功poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号将会从WAL最小版本号消费
- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;
说明以c接口为例
1. 一个消费组消费同一个topic下的所有数据不同消费组之间相互独立
2. 一个消费组消费同一个topic所有的vgroup消费组可由多个消费者组成但一个vgroup仅被一个消费者消费如果消费者数量超过了vgroup数量多余的消费者不消费数据
3. 在服务端每个vgroup仅保存一个offset每个vgroup的offset是单调递增的但不一定连续。各个vgroup的offset之间没有关联
4. 每次poll服务端会返回一个结果block该block属于一个vgroup可能包含多个wal版本的数据可以通过 tmq_get_vgroup_offset 接口获得是该block第一条记录的offset
5. 一个消费组如果从未commit过offset当其成员消费者重启重新拉取数据时均从参数auto.offset.reset设定值开始消费在一个消费者生命周期中客户端本地记录了最近一次拉取数据的offset不会拉取重复数据
6. 消费者如果异常终止没有调用tmq_close需等约12秒后触发其所属消费组rebalance该消费者在服务端状态变为LOST约1天后该消费者自动被删除正常退出退出后就会删除消费者新增消费者需等约2秒触发rebalance该消费者在服务端状态变为ready
7. 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作
8. 消费者可利用 tmq_position 获得当前消费的offset并seek到指定offset重新消费
9. seek将position指向指定offset不执行commit操作一旦seek成功可poll拉取指定offset及以后的数据
10. seek 操作之前须调用 tmq_get_topic_assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法如非法将报错
11. tmq_get_vgroup_offset接口获取的是记录所在结果block块里的第一条数据的offset当seek至该offset时将消费到这个block里的全部数据。参见第四点
12. 由于存在 WAL 过期删除机制即使seek 操作成功poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号将会从WAL最小版本号消费
13. 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;
本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索。
从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
## 主要数据结构和 API

View File

@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
int32_t tsS3BlockSize = 4096; // number of tsdb pages
int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsCheckpointInterval = 20;
#ifndef _STORAGE
@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
return 0;
}
struct SConfig *taosGetCfg() { return tsCfg; }
struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
@ -655,6 +660,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
// min free disk space used to check if the disk is full [50MB, 1GB]
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
@ -1070,6 +1077,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32;
tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64;
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
GRANT_CFG_GET;
return 0;
}
@ -1643,6 +1653,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
return;
}
if (strcasecmp(option, "s3BlockSize") == 0) {
int32_t newS3BlockSize = atoi(value);
uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize);
tsS3BlockSize = newS3BlockSize;
return;
}
if (strcasecmp(option, "s3BlockCacheSize") == 0) {
int32_t newS3BlockCacheSize = atoi(value);
uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize);
tsS3BlockCacheSize = newS3BlockCacheSize;
return;
}
if (strcasecmp(option, "keepTimeOffset") == 0) {
int32_t newKeepTimeOffset = atoi(value);
uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset);

View File

@ -380,6 +380,8 @@ struct STsdb {
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
SLRUCache *bCache;
TdThreadMutex bMutex;
struct STFileSystem *pFS; // new
SRocksCache rCache;
};
@ -643,13 +645,19 @@ struct SRowMerger {
};
typedef struct {
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
STsdb *pTsdb;
const char *objName;
uint8_t s3File;
int32_t fid;
int64_t cid;
int64_t blkno;
} STsdbFD;
struct SDelFWriter {
@ -716,9 +724,9 @@ typedef struct SSttBlockLoadCostInfo {
} SSttBlockLoadCostInfo;
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
SBlockData blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
void *pSttStatisBlkArray;
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
@ -861,6 +869,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);

View File

@ -22,15 +22,21 @@
extern "C" {
#endif
extern int8_t tsS3Enabled;
#define S3_BLOCK_CACHE
extern int8_t tsS3Enabled;
extern int32_t tsS3BlockSize;
extern int32_t tsS3BlockCacheSize;
int32_t s3Init();
void s3CleanUp();
int32_t s3PutObjectFromFile(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object);
void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);

View File

@ -16,6 +16,7 @@
#include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
#include "vnd.h"
#include "vndCos.h"
#define ROCKS_BATCH_SIZE (4096)
@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
}
}
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
int32_t code = 0;
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->bMutex, NULL);
_err:
pTsdb->bCache = pCache;
return code;
}
static void tsdbCloseBCache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->bCache;
if (pCache) {
int32_t elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheEraseUnrefEntries(pCache);
elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->bMutex);
}
}
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
typedef struct {
@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err;
}
code = tsdbOpenBCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbOpenRocksCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
}
tsdbCloseBICache(pTsdb);
tsdbCloseBCache(pTsdb);
tsdbCloseRocksCache(pTsdb);
}
@ -2987,3 +3030,100 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
return code;
}
// block cache
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
struct {
int32_t fid;
int64_t commitID;
int64_t blkno;
} bKey = {0};
bKey.fid = fid;
bKey.commitID = commitID;
bKey.blkno = blkno;
*len = sizeof(bKey);
memcpy(key, &bKey, *len);
}
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
int32_t code = 0;
/*
uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage);
if (pBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*/
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
// int64_t size = 4096;
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFree(pBlock);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
//*ppBlock = pBlock;
tsdbTrace("block:%p load from s3", *ppBlock);
_exit:
return code;
}
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
(void)ud;
uint8_t *pBlock = (uint8_t *)value;
taosMemoryFree(pBlock);
}
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pFD->pTsdb;
taosThreadMutexLock(&pTsdb->bMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
uint8_t *pBlock = NULL;
code = tsdbCacheLoadBlockS3(pFD, &pBlock);
// if table's empty or error, return code of -1
if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
taosThreadMutexUnlock(&pTsdb->bMutex);
*handle = NULL;
return 0;
}
size_t charge = tsS3BlockSize * pFD->szPage;
_taos_lru_deleter_t deleter = deleteBCache;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->bMutex);
}
*handle = h;
return code;
}
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}

View File

@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
if (fname) {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (fname[i]) {
code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
if (config->files[i].exist) {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -1466,7 +1466,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
}
tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->files[ftype].size == 0) {
@ -1634,7 +1634,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
@ -1684,4 +1684,4 @@ _exit:
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
}

View File

@ -31,7 +31,7 @@ typedef struct SFDataPtr {
int64_t size;
} SFDataPtr;
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
@ -41,4 +41,4 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD);
}
#endif
#endif /*_TD_TSDB_DEF_H_*/
#endif /*_TD_TSDB_DEF_H_*/

View File

@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
const char *object_name = taosDirEntryBaseName((char *)path);
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
#ifndef S3_BLOCK_CACHE
s3EvictCache(path, s3_size);
s3Get(object_name, path);
@ -38,6 +39,14 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
// taosMemoryFree(pFD);
goto _exit;
}
#else
pFD->s3File = 1;
pFD->pFD = (TdFilePtr)&pFD->s3File;
int32_t vid = 0;
sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
pFD->objName = object_name;
// pFD->szFile = s3_size;
#endif
} else {
code = TAOS_SYSTEM_ERROR(errsv);
// taosMemoryFree(pFD);
@ -72,9 +81,10 @@ _exit:
}
// =============== PAGE-WISE FILE ===============
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD = NULL;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
*ppFD = NULL;
@ -90,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
pFD->flag = flag;
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->pTsdb = pTsdb;
*ppFD = pFD;
@ -101,7 +112,9 @@ void tsdbCloseFile(STsdbFD **ppFD) {
STsdbFD *pFD = *ppFD;
if (pFD) {
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
if (!pFD->s3File) {
taosCloseFile(&pFD->pFD);
}
taosMemoryFree(pFD);
*ppFD = NULL;
}
@ -153,22 +166,41 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
}
}
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < pFD->szPage) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
if (pFD->s3File) {
LRUHandle *handle = NULL;
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
int32_t code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
goto _exit;
}
uint8_t *pBlock = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->bCache, handle);
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
} else {
// seek
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < pFD->szPage) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
}
// check
@ -293,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
// head
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
@ -307,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD);
if (code) goto _err;
if (pWriter->fData.size == 0) {
code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
@ -322,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD);
if (code) goto _err;
if (pWriter->fSma.size == 0) {
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
@ -335,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0);
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
@ -907,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
// head
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD);
TSDB_CHECK_CODE(code, lino, _exit);
// data
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD);
TSDB_CHECK_CODE(code, lino, _exit);
// sma
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD);
TSDB_CHECK_CODE(code, lino, _exit);
// stt
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1323,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
pDelFWriter->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE,
&pDelFWriter->pWriteH);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
TSDB_CHECK_CODE(code, lino, _exit);
// update header
@ -1498,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
pDelFReader->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH);
if (code) {
taosMemoryFree(pDelFReader);
goto _exit;

View File

@ -114,7 +114,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
TSDB_CHECK_CODE(code, lino, _exit);
char *object_name = taosDirEntryBaseName(fname);
code = s3PutObjectFromFile(from->fname, object_name);
code = s3PutObjectFromFile2(from->fname, object_name);
TSDB_CHECK_CODE(code, lino, _exit);
taosCloseFile(&fdFrom);

View File

@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
// open file
if (fname) {
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, config->file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(writer->config->tsdb, writer->file, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
@ -984,4 +984,4 @@ _exit:
return code;
}
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }
bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; }

View File

@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(tsdb, &file, fname);
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit);
// convert
@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade
code = tsdbTFileObjInit(tsdb, &file, &fobj);
TSDB_CHECK_CODE(code, lino, _exit1);
code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit1);
for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) {
@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f
}
char fname[TSDB_FILENAME_LEN] = {0};
code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize,
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
@ -633,4 +632,4 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
taosRemoveFile(fname);
return 0;
}
}

View File

@ -59,17 +59,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t *resp_headers;
int traffic_limit = 0;
// int traffic_limit = 0;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_table_t *headers = NULL;
/*
if (traffic_limit) {
// 限速值设置范围为819200 - 838860800即100KB/s - 100MB/s如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
*/
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&file, file_str);
cos_str_set(&object, object_str);
@ -85,6 +87,48 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
return code;
}
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
int32_t code = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t *resp_headers;
int traffic_limit = 0;
cos_table_t *headers = NULL;
cos_resumable_clt_params_t *clt_params = NULL;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
headers = cos_table_make(p, 0);
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&file, file_str);
cos_str_set(&object, object_str);
// upload
clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL);
s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL);
if (!cos_status_is_ok(s)) {
vError("s3: %s", s->error_msg);
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
code = terrno;
return code;
}
log_status(s);
cos_pool_destroy(p);
if (s->code != 200) {
return code = s->code;
}
return code;
}
void s3DeleteObjectsByPrefix(const char *prefix_str) {
cos_pool_t *p = NULL;
cos_request_options_t *options = NULL;
@ -217,6 +261,77 @@ bool s3Get(const char *object_name, const char *path) {
return ret;
}
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) {
int32_t code = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_buf_t *content = NULL;
// cos_string_t file;
// int traffic_limit = 0;
char range_buf[64];
//创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
options = cos_request_options_create(p);
// init_test_request_options(options, is_cname);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&object, object_name);
cos_list_t download_buffer;
cos_list_init(&download_buffer);
/*
if (traffic_limit) {
// 限速值设置范围为819200 - 838860800单位默认为 bit/s即800Kb/s - 800Mb/s如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
*/
headers = cos_table_create_if_null(options, headers, 1);
apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset,
offset + block_size - 1);
apr_table_add(headers, COS_RANGE, range_buf);
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
if (!cos_status_is_ok(s)) {
vError("s3: %s", s->error_msg);
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
code = terrno;
return code;
}
log_status(s);
// print_headers(resp_headers);
int64_t len = 0;
int64_t size = 0;
int64_t pos = 0;
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); }
// char *buf = cos_pcalloc(p, (apr_size_t)(len + 1));
char *buf = taosMemoryCalloc(1, (apr_size_t)(len));
// buf[len] = '\0';
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) {
size = cos_buf_size(content);
memcpy(buf + pos, content->pos, (size_t)size);
pos += size;
}
// cos_warn_log("Download data=%s", buf);
//销毁内存池
cos_pool_destroy(p);
*ppBlock = buf;
return code;
}
typedef struct {
int64_t size;
int32_t atime;
@ -333,10 +448,12 @@ long s3Size(const char *object_name) {
int32_t s3Init() { return 0; }
void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; }
void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }

View File

@ -75,7 +75,7 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->d);
colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type) || pDst->info.type == TSDB_DATA_TYPE_BOOL) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
@ -85,7 +85,10 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32
colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataSetVal(pDst, rowIndex, (const char*)&currentKey, isNull);
} else { // varchar/nchar data
} else if (pDst->info.type == TSDB_DATA_TYPE_NCHAR || pDst->info.type == TSDB_DATA_TYPE_VARCHAR ||
pDst->info.type == TSDB_DATA_TYPE_VARBINARY) {
colDataSetVal(pDst, rowIndex, pVar->pz, isNull);
} else { // others data
colDataSetNULL(pDst, rowIndex);
}
}

View File

@ -338,6 +338,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hyperloglog.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/irate.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/irate.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/join.py
@ -569,6 +570,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 2
@ -698,6 +700,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 3
@ -794,6 +797,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -R

View File

@ -0,0 +1,262 @@
import taos
import sys
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
dbname = "db"
tbname = "tb"
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
tdSql.execute(
f'''create table if not exists {dbname}.{tbname}
(ts timestamp, c0 int, c1 bool, c2 varchar(100), c3 nchar(100), c4 varbinary(100))
'''
)
tdLog.printNoPrefix("==========step2:insert data")
tdSql.execute(f"use db")
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:05', 5, true, 'varchar', 'nchar', 'varbinary')")
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:10', 10, true, 'varchar', 'nchar', 'varbinary')")
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:15', 15, NULL, NULL, NULL, NULL)")
tdLog.printNoPrefix("==========step3:fill data")
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, 'xx');")
tdSql.checkRows(5)
tdSql.checkData(0, 0, 9)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
tdSql.checkData(3, 0, 15)
tdSql.checkData(4, 0, 9)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, False)
tdSql.checkData(4, 1, False)
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, True);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, True)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, True)
tdSql.checkData(4, 1, True)
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, False);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, False)
tdSql.checkData(4, 1, False)
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, 'abc');")
tdSql.checkRows(5)
tdSql.checkData(0, 1, "abc")
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, "abc")
tdSql.checkData(4, 1, "abc")
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是#$^中文');")
tdSql.checkRows(5)
tdSql.checkData(0, 1, '我是#$^中文')
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, "我是#$^中文")
tdSql.checkData(4, 1, "我是#$^中文")
tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是#$^中文');")
tdSql.checkRows(5)
tdSql.checkData(0, 1, '我是#$^中文')
tdSql.checkData(1, 1, 'nchar')
tdSql.checkData(2, 1, 'nchar')
tdSql.checkData(3, 1, "我是#$^中文")
tdSql.checkData(4, 1, "我是#$^中文")
tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, '我是中文');")
tdSql.checkRows(5)
tdSql.checkData(0, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87')
tdSql.checkData(1, 1, b'varbinary')
tdSql.checkData(2, 1, b'varbinary')
tdSql.checkData(3, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87')
tdSql.checkData(4, 1, b'\xe6\x88\x91\xe6\x98\xaf\xe4\xb8\xad\xe6\x96\x87')
tdLog.printNoPrefix("==========step4:fill null")
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, NULL, NULL);")
tdSql.checkRows(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
tdSql.checkData(3, 0, 15)
tdSql.checkData(4, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, 'nchar')
tdSql.checkData(2, 1, 'nchar')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(value, 9, NULL);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, b'varbinary')
tdSql.checkData(2, 1, b'varbinary')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdLog.printNoPrefix("==========step5:fill prev")
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);")
tdSql.checkRows(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
tdSql.checkData(3, 0, 15)
tdSql.checkData(4, 0, 15)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, True)
tdSql.checkData(4, 1, True)
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, 'varchar')
tdSql.checkData(4, 1, 'varchar')
tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, 'nchar')
tdSql.checkData(2, 1, 'nchar')
tdSql.checkData(3, 1, 'nchar')
tdSql.checkData(4, 1, 'nchar')
tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(PREV);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, b'varbinary')
tdSql.checkData(2, 1, b'varbinary')
tdSql.checkData(3, 1, b'varbinary')
tdSql.checkData(4, 1, b'varbinary')
tdLog.printNoPrefix("==========step6:fill next")
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
tdSql.checkData(3, 0, 15)
tdSql.checkData(4, 0, None)
tdSql.checkData(0, 1, True)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, 'varchar')
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, 'nchar')
tdSql.checkData(1, 1, 'nchar')
tdSql.checkData(2, 1, 'nchar')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, b'varbinary')
tdSql.checkData(1, 1, b'varbinary')
tdSql.checkData(2, 1, b'varbinary')
tdSql.checkData(3, 1, None)
tdSql.checkData(4, 1, None)
tdSql.execute(f"insert into {dbname}.{tbname} values ('2020-02-01 00:00:20', 15, False, '中文', '中文', '中文');")
tdLog.printNoPrefix("==========step6:fill next")
tdSql.query(f"select avg(c0), last(c1) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, True)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
tdSql.checkData(3, 1, False)
tdSql.checkData(4, 1, False)
tdSql.query(f"select avg(c0), last(c2) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, 'varchar')
tdSql.checkData(1, 1, 'varchar')
tdSql.checkData(2, 1, 'varchar')
tdSql.checkData(3, 1, '中文')
tdSql.checkData(4, 1, '中文')
tdSql.query(f"select avg(c0), last(c3) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, 'nchar')
tdSql.checkData(1, 1, 'nchar')
tdSql.checkData(2, 1, 'nchar')
tdSql.checkData(3, 1, '中文')
tdSql.checkData(4, 1, '中文')
tdSql.query(f"select avg(c0), last(c4) from {dbname}.{tbname} where ts>='2020-02-01 00:00:00' and ts<='2020-02-01 00:00:20' interval(5s) fill(NEXT);")
tdSql.checkRows(5)
tdSql.checkData(0, 1, b'varbinary')
tdSql.checkData(1, 1, b'varbinary')
tdSql.checkData(2, 1, b'varbinary')
tdSql.checkData(3, 1, b'\xe4\xb8\xad\xe6\x96\x87')
tdSql.checkData(4, 1, b'\xe4\xb8\xad\xe6\x96\x87')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -375,6 +375,22 @@ class TDTestCase:
tdSql.checkData(11, 0, True)
tdSql.checkData(12, 0, True)
tdSql.query(f"select interp(c6) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(value, false)")
tdSql.checkRows(13)
tdSql.checkData(0, 0, False)
tdSql.checkData(1, 0, True)
tdSql.checkData(2, 0, False)
tdSql.checkData(3, 0, False)
tdSql.checkData(4, 0, False)
tdSql.checkData(5, 0, False)
tdSql.checkData(6, 0, True)
tdSql.checkData(7, 0, False)
tdSql.checkData(8, 0, False)
tdSql.checkData(9, 0, False)
tdSql.checkData(10, 0, False)
tdSql.checkData(11, 0, True)
tdSql.checkData(12, 0, False)
tdSql.query(f"select interp(c6) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(value, NULL)")
tdSql.checkRows(13)
tdSql.checkData(0, 0, None)

View File

@ -291,8 +291,8 @@ class TDTestCase:
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, "binary5")
tdSql.checkData(0, 9, "nchar5")
tdSql.checkData(1, 8, None)
tdSql.checkData(1, 9, None)
tdSql.checkData(1, 8, "-8")
tdSql.checkData(1, 9, "-9")
limit = paraDict["rowsPerTbl"]

View File

@ -210,6 +210,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -P
python3 ./test.py -f 2-query/hyperloglog.py -P -R
python3 ./test.py -f 2-query/interp.py -P
python3 ./test.py -f 2-query/interp.py -P -R
python3 ./test.py -f 2-query/fill.py -P
python3 ./test.py -f 2-query/irate.py -P
python3 ./test.py -f 2-query/irate.py -P -R
python3 ./test.py -f 2-query/join.py -P
@ -450,6 +451,7 @@ python3 ./test.py -f 2-query/arccos.py -P -Q 2
python3 ./test.py -f 2-query/arctan.py -P -Q 2
python3 ./test.py -f 2-query/query_cols_tags_and_or.py -P -Q 2
python3 ./test.py -f 2-query/interp.py -P -Q 2
python3 ./test.py -f 2-query/fill.py -P -Q 2
python3 ./test.py -f 2-query/nestedQueryInterval.py -P -Q 2
python3 ./test.py -f 2-query/stablity.py -P -Q 2
python3 ./test.py -f 2-query/stablity_1.py -P -Q 2
@ -579,6 +581,7 @@ python3 ./test.py -f 2-query/last_row.py -P -Q 3
python3 ./test.py -f 2-query/tsbsQuery.py -P -Q 3
python3 ./test.py -f 2-query/sml.py -P -Q 3
python3 ./test.py -f 2-query/interp.py -P -Q 3
python3 ./test.py -f 2-query/fill.py -P -Q 3
python3 ./test.py -f 2-query/case_when.py -P -Q 3
python3 ./test.py -f 2-query/blockSMA.py -P -Q 3
python3 ./test.py -f 2-query/projectionDesc.py -P -Q 3
@ -675,6 +678,7 @@ python3 ./test.py -f 2-query/last_row.py -P -Q 4
python3 ./test.py -f 2-query/tsbsQuery.py -P -Q 4
python3 ./test.py -f 2-query/sml.py -P -Q 4
python3 ./test.py -f 2-query/interp.py -P -Q 4
python3 ./test.py -f 2-query/fill.py -P -Q 4
python3 ./test.py -f 2-query/case_when.py -P -Q 4
python3 ./test.py -f 2-query/insert_select.py -P
python3 ./test.py -f 2-query/insert_select.py -P -R

View File

@ -271,6 +271,7 @@ python3 ./test.py -f 2-query/hyperloglog.py
python3 ./test.py -f 2-query/hyperloglog.py -R
python3 ./test.py -f 2-query/interp.py
python3 ./test.py -f 2-query/interp.py -R
python3 ./test.py -f 2-query/fill.py
python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/irate.py -R
python3 ./test.py -f 2-query/join.py
@ -496,6 +497,7 @@ python3 ./test.py -f 2-query/arccos.py -Q 2
python3 ./test.py -f 2-query/arctan.py -Q 2
python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 2
python3 ./test.py -f 2-query/interp.py -Q 2
python3 ./test.py -f 2-query/fill.py -Q 2
python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 2
python3 ./test.py -f 2-query/stablity.py -Q 2
python3 ./test.py -f 2-query/stablity_1.py -Q 2
@ -624,6 +626,7 @@ python3 ./test.py -f 2-query/last_row.py -Q 3
python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
python3 ./test.py -f 2-query/sml.py -Q 3
python3 ./test.py -f 2-query/interp.py -Q 3
python3 ./test.py -f 2-query/fill.py -Q 3
python3 ./test.py -f 2-query/case_when.py -Q 3
python3 ./test.py -f 2-query/blockSMA.py -Q 3
python3 ./test.py -f 2-query/projectionDesc.py -Q 3
@ -717,6 +720,7 @@ python3 ./test.py -f 2-query/last_row.py -Q 4
python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
python3 ./test.py -f 2-query/sml.py -Q 4
python3 ./test.py -f 2-query/interp.py -Q 4
python3 ./test.py -f 2-query/fill.py -Q 4
python3 ./test.py -f 2-query/case_when.py -Q 4
python3 ./test.py -f 2-query/insert_select.py
python3 ./test.py -f 2-query/insert_select.py -R