Merge pull request #28551 from taosdata/feat/TS-5215-2
test(blob): testing & fixes for blob
This commit is contained in:
commit
08603e5d11
|
@ -163,3 +163,15 @@ s3BucketName td-test
|
||||||
- 认为全部 S3 服务均指向同一数据源,对各个 S3 服务操作完全等价
|
- 认为全部 S3 服务均指向同一数据源,对各个 S3 服务操作完全等价
|
||||||
- 在某一 S3 服务上操作失败后会切换至其他服务,全部服务都失败后将返回最后产生的错误码
|
- 在某一 S3 服务上操作失败后会切换至其他服务,全部服务都失败后将返回最后产生的错误码
|
||||||
- 最大支持的 S3 服务配置数为 10
|
- 最大支持的 S3 服务配置数为 10
|
||||||
|
|
||||||
|
### 不依赖 Flexify 服务
|
||||||
|
|
||||||
|
用户界面同 S3,不同的地方在于下面三个参数的配置:
|
||||||
|
|
||||||
|
| # | 参数 | 示例值 | 描述 |
|
||||||
|
| :--- | :----------- | :--------------------------------------- | :----------------------------------------------------------- |
|
||||||
|
| 1 | s3EndPoint | https://fd2d01c73.blob.core.windows.net | Blob URL |
|
||||||
|
| 2 | s3AccessKey | fd2d01c73:veUy/iRBeWaI2YAerl+AStw6PPqg== | 冒号分隔的用户 accountId:accountKey |
|
||||||
|
| 3 | s3BucketName | test-container | Container name |
|
||||||
|
|
||||||
|
其中 fd2d01c73 是账户 ID;微软 Blob 存储服务只支持 Https 协议,不支持 Http。
|
||||||
|
|
|
@ -22,13 +22,16 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
|
||||||
|
|
||||||
|
void azEnd() {}
|
||||||
|
|
||||||
#if defined(USE_S3)
|
#if defined(USE_S3)
|
||||||
|
|
||||||
#include <azure/core.hpp>
|
#include <azure/core.hpp>
|
||||||
#include <azure/storage/blobs.hpp>
|
#include <azure/storage/blobs.hpp>
|
||||||
#include "td_block_blob_client.hpp"
|
#include "td_block_blob_client.hpp"
|
||||||
|
|
||||||
// Add appropriate using namespace directives
|
|
||||||
using namespace Azure::Storage;
|
using namespace Azure::Storage;
|
||||||
using namespace Azure::Storage::Blobs;
|
using namespace Azure::Storage::Blobs;
|
||||||
|
|
||||||
|
@ -40,10 +43,6 @@ extern char tsS3BucketName[TSDB_FQDN_LEN];
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
extern int8_t tsS3EpNum;
|
extern int8_t tsS3EpNum;
|
||||||
|
|
||||||
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
|
|
||||||
|
|
||||||
void azEnd() {}
|
|
||||||
|
|
||||||
static void checkPrint(const char *fmt, ...) {
|
static void checkPrint(const char *fmt, ...) {
|
||||||
va_list arg_ptr;
|
va_list arg_ptr;
|
||||||
va_start(arg_ptr, fmt);
|
va_start(arg_ptr, fmt);
|
||||||
|
@ -223,7 +222,6 @@ static int32_t azPutObjectFromFileOffsetImpl(const char *file, const char *objec
|
||||||
uint8_t blobContent[] = "Hello Azure!";
|
uint8_t blobContent[] = "Hello Azure!";
|
||||||
// Create the block blob client
|
// Create the block blob client
|
||||||
// BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
|
// BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
|
||||||
// TDBlockBlobClient blobClient(containerClient.GetBlobClient(blobName));
|
|
||||||
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
|
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
|
||||||
|
|
||||||
blobClient.UploadFrom(file, offset, size);
|
blobClient.UploadFrom(file, offset, size);
|
||||||
|
@ -467,7 +465,7 @@ int32_t azGetObjectToFile(const char *object_name, const char *fileName) {
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
|
static int32_t azGetObjectsByPrefixImpl(const char *prefix, const char *path) {
|
||||||
const std::string delimiter = "/";
|
const std::string delimiter = "/";
|
||||||
std::string accountName = tsS3AccessKeyId[0];
|
std::string accountName = tsS3AccessKeyId[0];
|
||||||
std::string accountKey = tsS3AccessKeySecret[0];
|
std::string accountKey = tsS3AccessKeySecret[0];
|
||||||
|
@ -514,6 +512,23 @@ int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
try {
|
||||||
|
code = azGetObjectsByPrefixImpl(prefix, path);
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
azError("%s: Reason Phrase: %s", __func__, e.what());
|
||||||
|
|
||||||
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
|
azError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t azDeleteObjects(const char *object_name[], int nobject) {
|
int32_t azDeleteObjects(const char *object_name[], int nobject) {
|
||||||
for (int i = 0; i < nobject; ++i) {
|
for (int i = 0; i < nobject; ++i) {
|
||||||
azDeleteObjectsByPrefix(object_name[i]);
|
azDeleteObjectsByPrefix(object_name[i]);
|
||||||
|
@ -524,10 +539,6 @@ int32_t azDeleteObjects(const char *object_name[], int nobject) {
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
|
|
||||||
|
|
||||||
void azEnd() {}
|
|
||||||
|
|
||||||
int32_t azCheckCfg() { return TSDB_CODE_SUCCESS; }
|
int32_t azCheckCfg() { return TSDB_CODE_SUCCESS; }
|
||||||
|
|
||||||
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
||||||
|
|
|
@ -5065,13 +5065,13 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
memset(dstBuf, 0, cap);
|
memset(dstBuf, 0, cap);
|
||||||
nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
|
nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
|
||||||
if (nBytes <= 0 || nBytes >= cap) {
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
code = TSDB_CODE_OUT_OF_RANGE;
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
|
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));
|
||||||
|
|
|
@ -31,8 +31,6 @@ from frame.eos import *
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase(TBase):
|
class TDTestCase(TBase):
|
||||||
index = eutil.cpuRand(20) + 1
|
|
||||||
bucketName = f"ci-bucket{index}"
|
|
||||||
updatecfgDict = {
|
updatecfgDict = {
|
||||||
"supportVnodes":"1000",
|
"supportVnodes":"1000",
|
||||||
's3EndPoint': 'https://<account-id>.blob.core.windows.net',
|
's3EndPoint': 'https://<account-id>.blob.core.windows.net',
|
||||||
|
@ -44,7 +42,6 @@ class TDTestCase(TBase):
|
||||||
's3MigrateEnabled': '1'
|
's3MigrateEnabled': '1'
|
||||||
}
|
}
|
||||||
|
|
||||||
tdLog.info(f"assign bucketName is {bucketName}\n")
|
|
||||||
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
|
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
|
||||||
|
|
||||||
def insertData(self):
|
def insertData(self):
|
||||||
|
@ -152,13 +149,13 @@ class TDTestCase(TBase):
|
||||||
if keepLocal is not None:
|
if keepLocal is not None:
|
||||||
kw1 = f"s3_keeplocal {keepLocal}"
|
kw1 = f"s3_keeplocal {keepLocal}"
|
||||||
if chunkSize is not None:
|
if chunkSize is not None:
|
||||||
kw2 = f"s3_chunksize {chunkSize}"
|
kw2 = f"s3_chunkpages {chunkSize}"
|
||||||
if compact is not None:
|
if compact is not None:
|
||||||
kw3 = f"s3_compact {compact}"
|
kw3 = f"s3_compact {compact}"
|
||||||
|
|
||||||
sql = f" create database db1 vgroups 1 duration 1h {kw1} {kw2} {kw3}"
|
sql = f" create database db1 vgroups 1 duration 1h {kw1} {kw2} {kw3}"
|
||||||
tdSql.execute(sql, show=True)
|
tdSql.execute(sql, show=True)
|
||||||
#sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';"
|
#sql = f"select name,s3_keeplocal,s3_chunkpages,s3_compact from information_schema.ins_databases where name='db1';"
|
||||||
sql = f"select * from information_schema.ins_databases where name='db1';"
|
sql = f"select * from information_schema.ins_databases where name='db1';"
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
# 29 30 31 -> chunksize keeplocal compact
|
# 29 30 31 -> chunksize keeplocal compact
|
||||||
|
@ -172,15 +169,32 @@ class TDTestCase(TBase):
|
||||||
sql = "drop database db1"
|
sql = "drop database db1"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql)
|
||||||
|
|
||||||
|
def checkDefault(self, keepLocal, chunkSize, compact):
|
||||||
|
sql = f" create database db1 vgroups 1"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
#sql = f"select name,s3_keeplocal,s3_chunkpages,s3_compact from information_schema.ins_databases where name='db1';"
|
||||||
|
sql = f"select * from information_schema.ins_databases where name='db1';"
|
||||||
|
tdSql.query(sql)
|
||||||
|
# 29 30 31 -> chunksize keeplocal compact
|
||||||
|
if chunkSize is not None:
|
||||||
|
tdSql.checkData(0, 29, chunkSize)
|
||||||
|
if keepLocal is not None:
|
||||||
|
keepLocalm = keepLocal * 24 * 60
|
||||||
|
tdSql.checkData(0, 30, f"{keepLocalm}m")
|
||||||
|
if compact is not None:
|
||||||
|
tdSql.checkData(0, 31, compact)
|
||||||
|
sql = "drop database db1"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
|
||||||
def checkExcept(self):
|
def checkExcept(self):
|
||||||
# errors
|
# errors
|
||||||
sqls = [
|
sqls = [
|
||||||
f"create database db2 s3_keeplocal -1",
|
f"create database db2 s3_keeplocal -1",
|
||||||
f"create database db2 s3_keeplocal 0",
|
f"create database db2 s3_keeplocal 0",
|
||||||
f"create database db2 s3_keeplocal 365001",
|
f"create database db2 s3_keeplocal 365001",
|
||||||
f"create database db2 s3_chunksize -1",
|
f"create database db2 s3_chunkpages -1",
|
||||||
f"create database db2 s3_chunksize 0",
|
f"create database db2 s3_chunkpages 0",
|
||||||
f"create database db2 s3_chunksize 900000000",
|
f"create database db2 s3_chunkpages 900000000",
|
||||||
f"create database db2 s3_compact -1",
|
f"create database db2 s3_compact -1",
|
||||||
f"create database db2 s3_compact 100",
|
f"create database db2 s3_compact 100",
|
||||||
f"create database db2 duration 1d s3_keeplocal 1d"
|
f"create database db2 duration 1d s3_keeplocal 1d"
|
||||||
|
@ -226,16 +240,7 @@ class TDTestCase(TBase):
|
||||||
|
|
||||||
# except
|
# except
|
||||||
self.checkExcept()
|
self.checkExcept()
|
||||||
|
self.checkDefault(365, 131072, 1)
|
||||||
#
|
|
||||||
def preDb(self, vgroups):
|
|
||||||
cnt = int(time.time())%2 + 1
|
|
||||||
for i in range(cnt):
|
|
||||||
vg = eutil.cpuRand(9) + 1
|
|
||||||
sql = f"create database predb vgroups {vg}"
|
|
||||||
tdSql.execute(sql, show=True)
|
|
||||||
sql = "drop database predb"
|
|
||||||
tdSql.execute(sql, show=True)
|
|
||||||
|
|
||||||
# history
|
# history
|
||||||
def insertHistory(self):
|
def insertHistory(self):
|
||||||
|
@ -287,9 +292,6 @@ class TDTestCase(TBase):
|
||||||
if eos.isArm64Cpu():
|
if eos.isArm64Cpu():
|
||||||
tdLog.success(f"{__file__} arm64 ignore executed")
|
tdLog.success(f"{__file__} arm64 ignore executed")
|
||||||
else:
|
else:
|
||||||
|
|
||||||
self.preDb(10)
|
|
||||||
|
|
||||||
# insert data
|
# insert data
|
||||||
self.insertData()
|
self.insertData()
|
||||||
|
|
||||||
|
@ -311,7 +313,6 @@ class TDTestCase(TBase):
|
||||||
# check insert correct again
|
# check insert correct again
|
||||||
self.checkInsertCorrect()
|
self.checkInsertCorrect()
|
||||||
|
|
||||||
|
|
||||||
# check stream correct and drop stream
|
# check stream correct and drop stream
|
||||||
#self.checkStreamCorrect()
|
#self.checkStreamCorrect()
|
||||||
|
|
||||||
|
@ -321,7 +322,7 @@ class TDTestCase(TBase):
|
||||||
# insert history disorder data
|
# insert history disorder data
|
||||||
self.insertHistory()
|
self.insertHistory()
|
||||||
|
|
||||||
# checkBasic
|
# check db params
|
||||||
self.checkBasic()
|
self.checkBasic()
|
||||||
|
|
||||||
#self.checkInsertCorrect()
|
#self.checkInsertCorrect()
|
||||||
|
@ -335,10 +336,8 @@ class TDTestCase(TBase):
|
||||||
# drop database and free s3 file
|
# drop database and free s3 file
|
||||||
self.dropDb()
|
self.dropDb()
|
||||||
|
|
||||||
|
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
{
|
||||||
|
"filetype": "insert",
|
||||||
|
"cfgdir": "/etc/taos",
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 6030,
|
||||||
|
"user": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"connection_pool_size": 8,
|
||||||
|
"num_of_records_per_req": 4000,
|
||||||
|
"prepared_rand": 500,
|
||||||
|
"thread_count": 4,
|
||||||
|
"create_table_thread_count": 1,
|
||||||
|
"confirm_parameter_prompt": "no",
|
||||||
|
"databases": [
|
||||||
|
{
|
||||||
|
"dbinfo": {
|
||||||
|
"name": "db",
|
||||||
|
"drop": "yes",
|
||||||
|
"vgroups": 3,
|
||||||
|
"replica": 3,
|
||||||
|
"duration":"10d",
|
||||||
|
"s3_keeplocal":"30d",
|
||||||
|
"s3_chunkpages":"131072",
|
||||||
|
"tsdb_pagesize":"1",
|
||||||
|
"s3_compact":"1",
|
||||||
|
"wal_retention_size":"1",
|
||||||
|
"wal_retention_period":"1",
|
||||||
|
"flush_each_batch":"no",
|
||||||
|
"keep": "3650d"
|
||||||
|
},
|
||||||
|
"super_tables": [
|
||||||
|
{
|
||||||
|
"name": "stb",
|
||||||
|
"child_table_exists": "no",
|
||||||
|
"childtable_count": 500,
|
||||||
|
"insert_rows": 200000,
|
||||||
|
"childtable_prefix": "d",
|
||||||
|
"insert_mode": "taosc",
|
||||||
|
"timestamp_step": 100,
|
||||||
|
"start_timestamp": 1600000000000,
|
||||||
|
"columns": [
|
||||||
|
{ "type": "bool", "name": "bc"},
|
||||||
|
{ "type": "float", "name": "fc" },
|
||||||
|
{ "type": "double", "name": "dc"},
|
||||||
|
{ "type": "tinyint", "name": "ti"},
|
||||||
|
{ "type": "smallint", "name": "si" },
|
||||||
|
{ "type": "int", "name": "ic" ,"max": 1,"min": 1},
|
||||||
|
{ "type": "bigint", "name": "bi" },
|
||||||
|
{ "type": "utinyint", "name": "uti"},
|
||||||
|
{ "type": "usmallint", "name": "usi"},
|
||||||
|
{ "type": "uint", "name": "ui" },
|
||||||
|
{ "type": "ubigint", "name": "ubi"},
|
||||||
|
{ "type": "binary", "name": "bin", "len": 50},
|
||||||
|
{ "type": "nchar", "name": "nch", "len": 100}
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
|
||||||
|
{"name": "location","type": "binary", "len": 16, "values":
|
||||||
|
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
"replica": 1,
|
"replica": 1,
|
||||||
"duration":"10d",
|
"duration":"10d",
|
||||||
"s3_keeplocal":"30d",
|
"s3_keeplocal":"30d",
|
||||||
"s3_chunksize":"131072",
|
"s3_chunkpages":"131072",
|
||||||
"tsdb_pagesize":"1",
|
"tsdb_pagesize":"1",
|
||||||
"s3_compact":"1",
|
"s3_compact":"1",
|
||||||
"wal_retention_size":"1",
|
"wal_retention_size":"1",
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
"replica": 1,
|
"replica": 1,
|
||||||
"duration":"10d",
|
"duration":"10d",
|
||||||
"s3_keeplocal":"30d",
|
"s3_keeplocal":"30d",
|
||||||
"s3_chunksize":"131072",
|
"s3_chunkpages":"131072",
|
||||||
"tsdb_pagesize":"1",
|
"tsdb_pagesize":"1",
|
||||||
"s3_compact":"1",
|
"s3_compact":"1",
|
||||||
"wal_retention_size":"1",
|
"wal_retention_size":"1",
|
||||||
|
|
|
@ -62,12 +62,30 @@ class TDTestCase:
|
||||||
tdSql.query("show dnode 1 variables like '____debugFlag'")
|
tdSql.query("show dnode 1 variables like '____debugFlag'")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
||||||
tdSql.query("show dnode 1 variables like 's3MigrateEna%'")
|
tdSql.query("show dnode 1 variables like 's3MigrateEnab%'")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
tdSql.checkData(0, 0, 1)
|
tdSql.checkData(0, 0, 1)
|
||||||
tdSql.checkData(0, 1, 's3MigrateEnabled')
|
tdSql.checkData(0, 1, 's3MigrateEnabled')
|
||||||
tdSql.checkData(0, 2, 0)
|
tdSql.checkData(0, 2, 0)
|
||||||
|
|
||||||
|
tdSql.query("show dnode 1 variables like 's3MigrateIntervalSec%'")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
tdSql.checkData(0, 1, 's3MigrateIntervalSec')
|
||||||
|
tdSql.checkData(0, 2, 3600)
|
||||||
|
|
||||||
|
tdSql.query("show dnode 1 variables like 's3PageCacheSize%'")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
tdSql.checkData(0, 1, 's3PageCacheSize')
|
||||||
|
tdSql.checkData(0, 2, 4096)
|
||||||
|
|
||||||
|
tdSql.query("show dnode 1 variables like 's3UploadDelaySec%'")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
tdSql.checkData(0, 1, 's3UploadDelaySec')
|
||||||
|
tdSql.checkData(0, 2, 60)
|
||||||
|
|
||||||
def threadTest(self, threadID):
|
def threadTest(self, threadID):
|
||||||
print(f"Thread {threadID} starting...")
|
print(f"Thread {threadID} starting...")
|
||||||
tdsqln = tdCom.newTdSql()
|
tdsqln = tdCom.newTdSql()
|
||||||
|
|
Loading…
Reference in New Issue