Merge branch '3.0' into merge/3.0tomain

This commit is contained in:
Shengliang Guan 2024-12-26 19:26:18 +08:00
commit 668df53dc8
45 changed files with 1371 additions and 246 deletions

View File

@ -35,119 +35,67 @@ taosdump -i /file/path -h localhost -P 6030
# 2. 基于 TDengine Enterprise 进行数据备份恢复
## 2.1. 概
## 2.1. 概
基于 TDengine 的数据订阅功能TDengine Enterprise 实现了数据的增量备份和恢复。用户可以通过 taosExplorer 对 TDengine
集群进行备份和恢复。
TDengine Enterprise 的备份和恢复功能包括以下几个概念:
1. 备份对象:用户可以对一个数据库,或者一个超级表进行备份。
2. 备份计划:用户可以为某个备份对象创建一个备份计划。备份计划从指定的时间点开始,周期性的执行一次备份任务,并生成一组备份文件。
3. 备份点:每执行一次备份任务,生成一组备份文件,它们对应一个时间点,称为**备份点**。第一个备份点称为**初始备份点**。
4. 备份文件:多个备份点,组成备份计划的备份文件。
5. 恢复任务:用户可以选择备份计划的某个备份点,创建一个恢复任务。恢复任务会从初始备份点开始,逐个应用备份点,恢复到指定的备份点。
1. 增量数据备份:基于 TDengine 的数据订阅功能,将**备份对象**的所有数据变更(包括:新增、修改、删除、元数据变更等)记录下来,生成备份文件。
2. 数据恢复:使用增量数据备份生成的备份文件,将**备份对象**恢复到指定的时间点。
3. 备份对象:用户备份的对象,可以是一个**数据库**,也可以是一个**超级表**。
4. 备份计划:用户为备份对象创建一个周期性执行的备份任务。备份计划从指定的时间点开始,以**备份周期**为间隔,周期性地执行备份任务。备份任务每次生成一个**备份点**。
5. 备份点:每次执行备份任务,生成一组备份文件,它们对应一个时间点,称为**备份点**。第一个备份点称为**初始备份点**。
6. 恢复任务:用户选择备份计划的某个备份点,创建一个恢复任务。恢复任务从**初始备份点**开始,逐个回放**备份文件**中的数据变更,直到指定的备份点结束。
![backup-zh-00.png](./pic/backup-00-concept.png "数据备份和恢复")
以上面的图为例:
以上图为例:
1. 用户创建了一个备份计划,从 2024-08-27 00:00:00 开始,每隔 1 天执行一次备份任务。
2. 在 2024-08-27 00:00:00 执行了第一次备份任务,生成了一个备份点。
3. 之后,每隔 1 天执行一次备份任务,生成了多个备份点,组成了备份文件
4. 用户可以选择某个备份点,创建一个恢复任务,恢复到指定的备份点
1. 用户创建了一个**备份计划**,从 2024-08-27 00:00:00 开始,每隔 1 天执行一次**备份任务**
2. 在 2024-08-27 00:00:00 执行了第一次备份任务,生成了一个**初始备份点**
3. 之后,每隔 1 天执行一次备份任务,生成了多个**备份点**
4. 用户可以选择某个**备份点**,创建一个**恢复任务**
5. 恢复任务会从初始备份点开始,逐个应用备份点,恢复到指定的备份点。
## 2.2. 备份计划
## 2.2. 数据备份
### 2.1.1. 创建
1. 通过浏览器访问 taosExplorer 服务,访问地址通常为 TDengine 集群所在 IP 地址的端口 6060如 http://localhost:6060。
2. 在 taosExplorer 服务页面中,进入“系统管理 - 备份”页面,点击“创建备份计划”按钮。
![backup-zh-01.png](./pic/backup-01-create.png "创建备份计划")
3. 在弹出的“创建备份计划”表单中,填写备份计划的相关信息。
![backup-zh-02.png](./pic/backup-02-form.png "填写备份计划信息")
通过浏览器访问 taosExplorer 服务,访问地址通常为 TDengine 集群所在 IP 地址的端口 6060如 http://localhost:6060。 在
taosExplorer 服务页面中,进入“系统管理 - 备份”页面,在“备份计划”标签页下,点击“创建备份计划”,填写备份计划的相关信息。
需要填写的信息包括:
* 数据库:需要备份的数据库名称。一个备份计划只能备份一个数据库/超级表。
* 超级表:需要备份的超级表名称。如果不填写,则备份整个数据库。
* 下次执行时间:首次执行备份任务的日期时间。
* 备份周期:备份点之间的时间间隔。注意:备份周期必须大于数据库的 WAL_RETENTION_PERIOD 参数值。
* 错误重试次数:对于可通过重试解决的错误,系统会按照此次数进行重试。
* 错误重试间隔:每次重试之间的时间间隔。
* 目录:存储备份文件的目录。
* 备份文件大小:备份文件的大小限制。当备份文件大小达到此限制时,会自动创建新的备份文件。
* 文件压缩等级:备份文件的压缩等级。支持:最快速度、最佳压缩比、兼具速度和压缩比。
1. 数据库:需要备份的数据库名称。一个备份计划只能备份一个数据库/超级表。
2. 超级表:需要备份的超级表名称。如果不填写,则备份整个数据库。
3. 下次执行时间:首次执行备份任务的日期时间。
4. 备份周期:备份点之间的时间间隔。注意:备份周期必须大于数据库的 WAL_RETENTION_PERIOD 参数值。
5. 错误重试次数:对于可通过重试解决的错误,系统会按照此次数进行重试。
6. 错误重试间隔:每次重试之间的时间间隔。
7. 目录:存储备份文件的目录。
8. 备份文件大小:备份文件的大小限制。当备份文件大小达到此限制时,会自动创建新的备份文件。
9. 文件压缩等级:备份文件的压缩等级。支持:最快速度、最佳压缩比、兼具速度和压缩比。
创建成功后,备份计划会开始按照配置的参数运行。
创建成功后,备份计划会开始按照配置的参数运行。在“备份计划”下的列表中,可以查看已创建的备份计划。
### 2.1.2. 查看
备份计划支持以下操作:
在“备份计划”下的列表中,可以查看已创建的备份计划。
1. 查看:显示备份计划的详细信息。
2. 修改:修改备份计划的配置。修改备份计划的配置后,当前运行的备份任务会先停止,然后按照新的配置重新运行。
3. 复制:以选中的备份计划为模版,创建新的备份计划。除了数据库和超级表需要用户选择以外,其他配置项和被复制的计划相同。
4. 删除:删除备份计划。删除备份计划时,可以选择是否删除关联的备份文件。
5. 指标:查看备份计划的统计指标。
6. 查看备份点:查看和备份计划关联的所有备份点。
![backup-zh-03.png](./pic/backup-03-list.png "查看备份计划列表")
## 2.3. 备份文件
点击“操作”中的“查看”按钮,可以查看备份计划的详细信息。
在“备份文件”列表中,可以查看备份文件的详细信息。
![backup-zh-04.png](./pic/backup-04-view.png "查看备份计划详情")
## 2.4. 数据恢复
### 2.1.3. 修改
在“备份文件”列表中,选择一个备份点,可以创建一个恢复任务,数据库恢复到指定的时间。
点击“操作”中的“修改”按钮,可以修改备份计划的配置。
![backup-zh-05.png](./pic/backup-05-edit.png "修改备份计划")
修改备份计划的配置后,当前运行的备份任务会先停止,然后按照新的配置重新运行。
### 2.1.4. 复制
点击“操作”中的“复制”按钮,可以复制备份计划。
![backup-zh-06.png](./pic/backup-06-copy.png "复制备份计划")
除了数据库和超级表被置为空外,其他配置项和被复制的计划相同。用户点击“确认”后,创建一个新的备份计划。
### 2.1.5. 删除
在操作中点击关闭按钮,可以停止当前备份计划。点击“操作”中的“删除”按钮,可以删除备份计划。
![backup-zh-07.png](./pic/backup-07-del.png "删除备份计划")
删除备份计划时,可以选择,是否删除关联的备份文件。
## 2.2. 备份文件
### 2.2.1. 查看
在备份计划列表中,选择要一个备份计划。在“备份文件”列中,点击“查看”按钮。可以查看和备份计划的所有备份点。
![backup-zh-08.png](./pic/backup-08-files.png "查看备份文件")
在备份文件列表中,可以查看备份文件的详细信息。
![backup-zh-09.png](./pic/backup-09-filelist.png "查看备份文件列表")
## 2.3. 恢复任务
### 2.3.1. 创建
在备份文件列表中,点击“操作”中的“恢复”按钮,可以创建一个恢复任务。
![backup-zh-10.png](./pic/backup-10-restore-create.png "创建恢复任务")
在弹出的对话框中,选择使用哪个备份点开始恢复,默认为最早的备份点。点击“确定”后,创建恢复任务,并跳转至“恢复任务”列表。
### 2.3.2. 查看
在“恢复任务”列表中,可以查看已创建的恢复任务。
![backup-zh-11.png](./pic/backup-11-restore-list.png "查看恢复任务列表")
恢复任务可以终止。点击“操作”中的开关,可以终止当前恢复任务。
在“恢复任务”列表中,可以查看已创建的恢复任务。恢复任务可以终止。
# 3. 常见错误排查

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 119 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

View File

@ -389,7 +389,6 @@ typedef struct SStateStore {
int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
void (*streamStateSetFillInfo)(SStreamState* pState);
void (*streamStateClearExpiredState)(SStreamState* pState);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
@ -455,7 +454,6 @@ typedef struct SStateStore {
int32_t (*streamStateBegin)(SStreamState* pState);
void (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
} SStateStore;

View File

@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);
void streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState, bool remove);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
int32_t streamStateDelTaskDb(SStreamState* pState);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// twa
void streamStateSetFillInfo(SStreamState* pState);
void streamStateClearExpiredState(SStreamState* pState);
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);

View File

@ -148,7 +148,7 @@ int32_t tfsMkdirRecur(STfs *pTfs, const char *rname);
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
#if 0
/**
* @brief check directories exist in tfs.
*
@ -158,7 +158,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
* @return true for exist, false for not exist.
*/
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId);
#endif
/**
* @brief Remove directory at all levels in tfs.
*
@ -241,7 +241,7 @@ void tfsBasename(const STfsFile *pFile, char *dest);
* @param dest The buffer where dirname will be saved.
*/
void tfsDirname(const STfsFile *pFile, char *dest);
#if 0
/**
* @brief Get the absolute file name of rname.
*
@ -251,7 +251,7 @@ void tfsDirname(const STfsFile *pFile, char *dest);
* @param aname absolute file name
*/
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname);
#endif
/**
* @brief Remove file in tfs.
*

View File

@ -66,9 +66,9 @@ int32_t s3Begin() {
void s3End() { S3_deinitialize(); }
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
#if 0
static int32_t s3ListBucket(char const *bucketname);
#endif
static void s3DumpCfgByEp(int8_t epIndex) {
// clang-format off
(void)fprintf(stdout,
@ -291,7 +291,7 @@ static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) {
TAOS_RETURN(code);
}
#if 0
static int32_t s3ListBucket(char const *bucketname) {
int32_t code = 0;
@ -312,7 +312,7 @@ static int32_t s3ListBucket(char const *bucketname) {
TAOS_RETURN(code);
}
#endif
typedef struct growbuffer {
// The total number of bytes, and the start byte
int size;

View File

@ -40,6 +40,46 @@ add_test(
COMMAND dataformatTest
)
# cosCpTest.cpp
add_executable(cosCpTest "")
target_sources(
cosCpTest
PRIVATE
"cosCpTest.cpp"
)
target_link_libraries(cosCpTest gtest gtest_main util common)
target_include_directories(
cosCpTest
PUBLIC "${TD_SOURCE_DIR}/include/common"
PUBLIC "${TD_SOURCE_DIR}/include/util"
)
add_test(
NAME cosCpTest
COMMAND cosCpTest
)
if(TD_LINUX)
# cosTest.cpp
add_executable(cosTest "")
target_sources(
cosTest
PRIVATE
"cosTest.cpp"
)
target_link_libraries(cosTest gtest gtest_main util common)
target_include_directories(
cosTest
PUBLIC "${TD_SOURCE_DIR}/include/common"
PUBLIC "${TD_SOURCE_DIR}/include/util"
)
add_test(
NAME cosTest
COMMAND cosTest
)
endif()
if (${TD_LINUX})
# tmsg test
add_executable(tmsgTest "")

View File

@ -0,0 +1,305 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cos_cp.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, cpOpenCloseRemove) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpBuild) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size);
EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpLoad) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
if (taosCheckExistFile(file_cp_path)) {
EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS);
EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true);
EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD);
EXPECT_EQ(cp.md5, std::string(""));
EXPECT_EQ(cp.thefile, nullptr);
EXPECT_EQ(std::string(cp.file_path), "./afile");
EXPECT_EQ(cp.file_size, 1024);
EXPECT_EQ(cp.file_last_modified, 20241220141705);
EXPECT_EQ(cp.file_md5, std::string(""));
EXPECT_EQ(cp.object_name, std::string(""));
EXPECT_EQ(cp.object_size, 0);
EXPECT_EQ(cp.object_last_modified, std::string(""));
EXPECT_EQ(cp.object_etag, std::string(""));
EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx"));
EXPECT_EQ(cp.part_num, 1);
EXPECT_EQ(cp.part_size, 8388608);
EXPECT_EQ(cp.parts[0].index, 0);
EXPECT_EQ(cp.parts[0].offset, 0);
EXPECT_EQ(cp.parts[0].size, 1024);
EXPECT_EQ(cp.parts[0].completed, 0);
EXPECT_EQ(cp.parts[0].etag, std::string(""));
EXPECT_EQ(cp.parts[0].crc64, 0);
}
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpBuildUpdate) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
int seq = 1;
char *etags[1] = {"etags-1-xxx"};
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size);
cos_cp_update(&cp, cp.parts[seq - 1].index, etags[seq - 1], 0);
EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpLoadUpdate) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
if (taosCheckExistFile(file_cp_path)) {
EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS);
EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true);
EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD);
EXPECT_EQ(cp.md5, std::string(""));
EXPECT_EQ(cp.thefile, nullptr);
EXPECT_EQ(std::string(cp.file_path), "./afile");
EXPECT_EQ(cp.file_size, 1024);
EXPECT_EQ(cp.file_last_modified, 20241220141705);
EXPECT_EQ(cp.file_md5, std::string(""));
EXPECT_EQ(cp.object_name, std::string(""));
EXPECT_EQ(cp.object_size, 0);
EXPECT_EQ(cp.object_last_modified, std::string(""));
EXPECT_EQ(cp.object_etag, std::string(""));
EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx"));
EXPECT_EQ(cp.part_num, 1);
EXPECT_EQ(cp.part_size, 8388608);
EXPECT_EQ(cp.parts[0].index, 0);
EXPECT_EQ(cp.parts[0].offset, 0);
EXPECT_EQ(cp.parts[0].size, 1024);
EXPECT_EQ(cp.parts[0].completed, 1);
EXPECT_EQ(cp.parts[0].etag, std::string("etags-1-xxx"));
EXPECT_EQ(cp.parts[0].crc64, 0);
}
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}

View File

@ -0,0 +1,185 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cos.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
int32_t cosInitEnv() {
int32_t code = 0;
bool isBlob = false;
extern int8_t tsS3Ablob;
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
extern char tsS3BucketName[TSDB_FQDN_LEN];
tsS3Ablob = isBlob;
/*
const char *hostname = "endpoint/<account-name>.blob.core.windows.net";
const char *accessKeyId = "<access-key-id/account-name>";
const char *accessKeySecret = "<access-key-secret/account-key>";
const char *bucketName = "<bucket/container-name>";
*/
// const char *hostname = "http://192.168.1.52:9000";
// const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel";
// const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX";
// const char *bucketName = "test-bucket";
const char *hostname = "192.168.1.52:9000";
const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel";
const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX";
const char *bucketName = "ci-bucket19";
tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN);
// setup s3 env
extern int8_t tsS3EpNum;
extern int8_t tsS3Https[TSDB_MAX_EP_NUM];
tsS3EpNum = 1;
tsS3Https[0] = false;
tstrncpy(tsTempDir, "/tmp/", PATH_MAX);
tsS3Enabled = true;
return code;
}
TEST(testCase, cosCpPutError) {
int32_t code = 0, lino = 0;
char const *objectName = "testObject";
EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS);
EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS);
#if defined(USE_S3)
EXPECT_EQ(s3Size(objectName), -1);
#else
EXPECT_EQ(s3Size(objectName), 0);
#endif
s3EvictCache("", 0);
s3End();
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cosCpPut) {
int32_t code = 0, lino = 0;
int8_t with_cp = 0;
char *data = nullptr;
const long objectSize = 65 * 1024 * 1024;
char const *objectName = "cosut.bin";
const char object_name[] = "cosut.bin";
EXPECT_EQ(std::string(object_name), objectName);
EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS);
EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS);
{
data = (char *)taosMemoryCalloc(1, objectSize);
if (!data) {
TAOS_CHECK_EXIT(terrno);
}
for (int i = 0; i < objectSize / 2; ++i) {
data[i * 2 + 1] = 1;
}
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, objectSize);
GTEST_ASSERT_EQ(n, objectSize);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = s3PutObjectFromFile2(path, objectName, with_cp);
GTEST_ASSERT_EQ(code, 0);
with_cp = 1;
code = s3PutObjectFromFile2(path, objectName, with_cp);
GTEST_ASSERT_EQ(code, 0);
#if defined(USE_S3)
EXPECT_EQ(s3Size(objectName), objectSize);
#else
EXPECT_EQ(s3Size(objectName), 0);
#endif
s3End();
s3EvictCache("", 0);
taosMemoryFree(data);
EXPECT_EQ(taosRemoveFile(path), TSDB_CODE_SUCCESS);
}
return;
_exit:
if (data) {
taosMemoryFree(data);
s3End();
}
std::cout << "code: " << code << std::endl;
}

View File

@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -38,6 +38,8 @@ TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::strin
return newClient;
}
TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {}
#if 0
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr<StorageSharedKeyCredential> credential,
const BlobClientOptions& options)
: BlobClient(blobUrl, std::move(credential), options) {}
@ -50,8 +52,6 @@ TDBlockBlobClient::TDBlockBlobClient(const std::string&
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options)
: BlobClient(blobUrl, options) {}
TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {}
TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const {
TDBlockBlobClient newClient(*this);
if (snapshot.empty()) {
@ -74,47 +74,6 @@ TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId)
return newClient;
}
Azure::Response<Models::UploadBlockBlobResult> TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content,
const UploadBlockBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options,
const Azure::Core::Context& context) const {
@ -270,6 +229,47 @@ Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
#endif
Azure::Response<Models::UploadBlockBlobResult> TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content,
const UploadBlockBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options,
@ -349,7 +349,7 @@ Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
#if 0
Azure::Response<Models::UploadBlockBlobFromUriResult> TDBlockBlobClient::UploadFromUri(
const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options,
const Azure::Core::Context& context) const {
@ -396,7 +396,7 @@ Azure::Response<Models::UploadBlockBlobFromUriResult> TDBlockBlobClient::UploadF
return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#endif
Azure::Response<Models::StageBlockResult> TDBlockBlobClient::StageBlock(const std::string& blockId,
Azure::Core::IO::BodyStream& content,
const StageBlockOptions& options,
@ -419,7 +419,7 @@ Azure::Response<Models::StageBlockResult> TDBlockBlobClient::StageBlock(const st
protocolLayerOptions.EncryptionScope = m_encryptionScope;
return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
#if 0
Azure::Response<Models::StageBlockFromUriResult> TDBlockBlobClient::StageBlockFromUri(
const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options,
const Azure::Core::Context& context) const {
@ -457,7 +457,7 @@ Azure::Response<Models::StageBlockFromUriResult> TDBlockBlobClient::StageBlockFr
return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#endif
Azure::Response<Models::CommitBlockListResult> TDBlockBlobClient::CommitBlockList(
const std::vector<std::string>& blockIds, const CommitBlockListOptions& options,
const Azure::Core::Context& context) const {
@ -492,7 +492,7 @@ Azure::Response<Models::CommitBlockListResult> TDBlockBlobClient::CommitBlockLis
return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#if 0
Azure::Response<Models::GetBlockListResult> TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions;
@ -502,6 +502,7 @@ Azure::Response<Models::GetBlockListResult> TDBlockBlobClient::GetBlockList(cons
return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions,
_internal::WithReplicaStatus(context));
}
#endif
/*
Azure::Response<Models::QueryBlobResult> TDBlockBlobClient::Query(const std::string& querySqlExpression,
const QueryBlobOptions& options,

View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
// clang-format off
#include "td_block_blob_client.hpp"
#include "az.h"
// clang-format on
using namespace Azure::Storage;
using namespace Azure::Storage::Blobs;
extern int8_t tsS3Enabled;
extern char tsS3BucketName[TSDB_FQDN_LEN];
static int32_t azInitEnv() {
int32_t code = 0;
extern int8_t tsS3EpNum;
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
/* TCS parameter format
tsS3Hostname[0] = "<endpoint>/<account-name>.blob.core.windows.net";
tsS3AccessKeyId[0] = "<access-key-id/account-name>";
tsS3AccessKeySecret[0] = "<access-key-secret/account-key>";
tsS3BucketName = "<bucket/container-name>";
*/
const char *hostname = "<endpoint>/<account-name>.blob.core.windows.net";
const char *accessKeyId = "<access-key-id/account-name>";
const char *accessKeySecret = "<access-key-secret/account-key>";
const char *bucketName = "<bucket/container-name>";
if (hostname[0] != '<') {
tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN);
} else {
const char *accountId = getenv("ablob_account_id");
if (!accountId) {
return -1;
}
const char *accountSecret = getenv("ablob_account_secret");
if (!accountSecret) {
return -1;
}
const char *containerName = getenv("ablob_container");
if (!containerName) {
return -1;
}
TAOS_STRCPY(&tsS3Hostname[0][0], accountId);
TAOS_STRCAT(&tsS3Hostname[0][0], ".blob.core.windows.net");
TAOS_STRCPY(&tsS3AccessKeyId[0][0], accountId);
TAOS_STRCPY(&tsS3AccessKeySecret[0][0], accountSecret);
TAOS_STRCPY(tsS3BucketName, containerName);
}
tstrncpy(tsTempDir, "/tmp/", PATH_MAX);
tsS3Enabled = true;
return code;
}
// TEST(AzTest, DISABLED_InterfaceTest) {
TEST(AzETest, InterfaceTest) {
int code = 0;
bool check = false;
bool withcp = false;
code = azInitEnv();
if (code) {
std::cout << "ablob env init failed with: " << code << std::endl;
return;
}
GTEST_ASSERT_EQ(code, 0);
GTEST_ASSERT_EQ(tsS3Enabled, 1);
code = azBegin();
GTEST_ASSERT_EQ(code, 0);
code = azCheckCfg();
GTEST_ASSERT_EQ(code, 0);
const int size = 4096;
char data[size] = {0};
for (int i = 0; i < size / 2; ++i) {
data[i * 2 + 1] = 1;
}
const char object_name[] = "azut.bin";
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = azPutObjectFromFileOffset(path, object_name, 0, size);
GTEST_ASSERT_EQ(code, 0);
uint8_t *pBlock = NULL;
code = azGetObjectBlock(object_name, 0, size, check, &pBlock);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(pBlock[i * 2], 0);
GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1);
}
taosMemoryFree(pBlock);
code = azGetObjectToFile(object_name, path_download);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
azDeleteObjectsByPrefix(object_name);
// list object to check
code = azPutObjectFromFile2(path, object_name, withcp);
GTEST_ASSERT_EQ(code, 0);
code = azGetObjectsByPrefix(object_name, tsTempDir);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
TDBlockBlobClient blobClient =
TDBlockBlobClient::CreateFromConnectionString(std::getenv("ablob_cs"), std::string(tsS3BucketName), object_name);
const char *object_name_arr[] = {object_name};
code = azDeleteObjects(object_name_arr, 1);
GTEST_ASSERT_EQ(code, 0);
azEnd();
}

View File

@ -199,3 +199,132 @@ TEST(AzTest, InterfaceTest) {
azEnd();
}
// TEST(AzTest, DISABLED_InterfaceTestBig) {
TEST(AzTest, InterfaceTestBig) {
int code = 0;
bool check = false;
bool withcp = false;
code = azInitEnv();
if (code) {
std::cout << "ablob env init failed with: " << code << std::endl;
return;
}
GTEST_ASSERT_EQ(code, 0);
GTEST_ASSERT_EQ(tsS3Enabled, 1);
code = azBegin();
GTEST_ASSERT_EQ(code, 0);
code = azCheckCfg();
GTEST_ASSERT_EQ(code, 0);
const int size = 256 * 1024 * 1024 + 1;
char *data = (char *)taosMemoryCalloc(1, size);
if (!data) {
std::cout << "code: " << code << "terrno: " << terrno << std::endl;
return;
}
for (int i = 0; i < size / 2; ++i) {
data[i * 2 + 1] = 1;
}
const char object_name[] = "azut.bin";
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = azPutObjectFromFileOffset(path, object_name, 0, size);
GTEST_ASSERT_EQ(code, 0);
uint8_t *pBlock = NULL;
code = azGetObjectBlock(object_name, 0, size, check, &pBlock);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(pBlock[i * 2], 0);
GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1);
}
taosMemoryFree(pBlock);
code = azGetObjectToFile(object_name, path_download);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
azDeleteObjectsByPrefix(object_name);
// list object to check
code = azPutObjectFromFile2(path, object_name, withcp);
GTEST_ASSERT_EQ(code, 0);
code = azGetObjectsByPrefix(object_name, tsTempDir);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
const char *object_name_arr[] = {object_name};
code = azDeleteObjects(object_name_arr, 1);
GTEST_ASSERT_EQ(code, 0);
taosMemoryFree(data);
azEnd();
}

View File

@ -1986,10 +1986,19 @@ void catalogDestroy(void) {
}
if (gCtgMgmt.cacheTimer) {
if (taosTmrStop(gCtgMgmt.cacheTimer)) {
qTrace("stop catalog cache timer may failed");
if (!taosTmrStop(gCtgMgmt.cacheTimer)) {
/*
qDebug("catalog cacheTimer %" PRIuPTR " not stopped", (uintptr_t)gCtgMgmt.cacheTimer);
while (!taosTmrIsStopped(&gCtgMgmt.cacheTimer)) {
taosMsleep(1);
}
*/
}
qDebug("catalog cacheTimer %" PRIuPTR " is stopped", (uintptr_t)gCtgMgmt.cacheTimer);
gCtgMgmt.cacheTimer = NULL;
taosTmrCleanUp(gCtgMgmt.timer);
gCtgMgmt.timer = NULL;
}

View File

@ -480,6 +480,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
dbCache = (SCtgDBCache *)pIter;
CTG_LOCK(CTG_READ, &dbCache->dbLock);
dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache ? taosHashGetSize(dbCache->tbCache) : 0;
@ -509,6 +511,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
hashMethod, hashPrefix, hashSuffix, vgNum);
if (dbCache->vgCache.vgInfo) {
CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
int32_t i = 0;
void *pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, NULL);
while (pVgIter) {
@ -524,6 +528,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, pVgIter);
}
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
}
if (dbCache->cfgCache.cfgInfo) {
@ -544,6 +550,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
pCfg->schemaless, pCfg->sstTrigger);
}
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
++i;
pIter = taosHashIterate(dbHash, pIter);
}

View File

@ -149,12 +149,13 @@ void ctgTestInitLogFile() {
return;
}
const char *defaultLogFileNamePrefix = "taoslog";
const char *defaultLogFileNamePrefix = "catalogTest";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
tmrDebugFlag = 159;
tsNumOfLogLines = 1000000000;
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
(void)ctgdEnableDebug("api", true);
@ -1839,7 +1840,7 @@ TEST(tableMeta, updateStbMeta) {
while (true) {
uint64_t n = 0;
ASSERT(0 == ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n));
if (n != 3) {
if (n < 3) {
taosMsleep(50);
} else {
break;

View File

@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList);
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t uploadCheckpointToS3(const char* id, const char* path);
int32_t deleteCheckpointFile(const char* id, const char* name);
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId);
#ifdef __cplusplus
}

View File

@ -19,7 +19,6 @@
#include "tcs.h"
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
#ifdef BUILD_NO_CALL
static int32_t deleteCheckpoint(const char* id);
@ -230,8 +229,8 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
return code;
}
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;

View File

@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) {
taosMemoryFreeClear(pState);
}
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
return deleteExpiredCheckPoint(pState->pFileState, mark);
}
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); }
void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); }
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);

View File

@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
}
}
int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (pFileState->searchBuff != NULL) {
deleteHashSortRowBuff(pFileState, pKey);
}
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
}
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
}
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t maxCheckPointId = 0;
@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}
void setFillInfo(SStreamFileState* pFileState) {
pFileState->hasFillCatch = false;
}
void clearExpiredState(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1261,6 +1241,7 @@ _end:
}
}
#ifdef BUILD_NO_CALL
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1328,6 +1309,7 @@ _end:
}
return code;
}
#endif
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) {

View File

@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest)
extern int8_t tsS3EpNum;
tsS3EpNum = 1;
code = uploadCheckpointToS3("123", "/tmp/backend5/stream");
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
code = downloadCheckpointByNameS3("123", "/root/download", "");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
code = deleteCheckpointFile("aaa123", "bbb");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
}
TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream";
code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
ASSERT(pState != NULL);
pTask->pBackend = pState->pTdbState->pOwner->pBackend;
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.checkpointId = 123;
code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
pTask->chkInfo.pActiveInfo->failedId = 223;
code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
SStreamDataBlock block;
block.srcTaskId = 456;
SStreamTask upTask;
upTask = *pTask;
upTask.id.taskId = 456;
streamTaskSetUpstreamInfo(pTask, &upTask);
pTask->chkInfo.pActiveInfo->failedId = 23;
code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
streamTaskSetUpstreamInfo(pTask, &upTask);
streamTaskSetStatusReady(pTask);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 223;
STaskCheckpointReadyInfo readyInfo;
readyInfo.upstreamTaskId = 4567;
block.srcTaskId = 4567;
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 1111;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
}

View File

@ -66,3 +66,10 @@ int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) {
*size = szBytes / szPage;
return 0;
}
void tdbCloseDir(TdDirPtr *ppDir) {
int32_t ret = taosCloseDir(ppDir);
if (ret) {
tdbError("failed to close directory, reason:%s", tstrerror(ret));
}
}

View File

@ -71,12 +71,7 @@ typedef TdFilePtr tdb_fd_t;
#define tdbGetDirEntryName taosGetDirEntryName
#define tdbDirEntryBaseName taosDirEntryBaseName
static FORCE_INLINE void tdbCloseDir(TdDirPtr *ppDir) {
int32_t ret = taosCloseDir(ppDir);
if (ret) {
tdbError("failed to close directory, reason:%s", tstrerror(ret));
}
}
void tdbCloseDir(TdDirPtr *ppDir);
#define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL)

View File

@ -16,6 +16,10 @@
#ifndef _TD_TFS_INT_H_
#define _TD_TFS_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
@ -74,6 +78,7 @@ typedef struct STfs {
SHashObj *hash; // name to did map
} STfs;
int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg);
int32_t tfsNewDisk(int32_t level, int32_t id, int8_t disable, const char *dir, STfsDisk **ppDisk);
STfsDisk *tfsFreeDisk(STfsDisk *pDisk);
int32_t tfsUpdateDiskSize(STfsDisk *pDisk);

View File

@ -19,7 +19,6 @@
static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsCheck(STfs *pTfs);
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsFormatDir(char *idir, char *odir);
static int32_t tfsGetDiskByName(STfs *pTfs, const char *dir, STfsDisk **ppDisk);
static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir);
@ -245,13 +244,13 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN);
tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN);
}
#if 0
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
(void)snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
}
#endif
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
@ -340,7 +339,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
TAOS_RETURN(0);
}
#if 0
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
char aname[TMPNAME_LEN];
@ -348,7 +347,7 @@ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) {
(void)snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
return taosDirExist(aname);
}
#endif
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
if (rname[0] == 0) {
TAOS_RETURN(0);
@ -515,7 +514,7 @@ _exit:
TAOS_RETURN(code);
}
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
int32_t code = 0;
char dirName[TSDB_FILENAME_LEN] = "\0";
@ -577,32 +576,32 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
}
static int32_t tfsFormatDir(char *idir, char *odir) {
int32_t code = 0, lino = 0;
wordexp_t wep = {0};
int32_t dirLen = 0;
char tmp[PATH_MAX] = {0};
int32_t code = wordexp(idir, &wep, 0);
code = wordexp(idir, &wep, 0);
if (code != 0) {
TAOS_RETURN(TAOS_SYSTEM_ERROR(code));
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(code));
}
char tmp[PATH_MAX] = {0};
if (taosRealPath(wep.we_wordv[0], tmp, PATH_MAX) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
wordfree(&wep);
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(taosRealPath(wep.we_wordv[0], tmp, PATH_MAX));
int32_t dirLen = strlen(tmp);
dirLen = strlen(tmp);
if (dirLen < 0 || dirLen >= TSDB_FILENAME_LEN) {
wordfree(&wep);
code = TSDB_CODE_OUT_OF_RANGE;
fError("failed to mount %s to FS since %s, real path:%s, len:%d", idir, tstrerror(code), tmp, dirLen);
TAOS_RETURN(code);
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_RANGE);
}
tstrncpy(odir, tmp, TSDB_FILENAME_LEN);
_exit:
wordfree(&wep);
TAOS_RETURN(0);
if (code != 0) {
fError("failed to mount %s to FS at line %d since %s, real path:%s, len:%d", idir, lino, tstrerror(code), tmp,
dirLen);
}
TAOS_RETURN(code);
}
static int32_t tfsCheck(STfs *pTfs) {

View File

@ -41,13 +41,13 @@ void tfsDestroyTier(STfsTier *pTier) {
int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk) {
int32_t code = 0;
int32_t lino = 0;
int32_t id = 0;
STfsDisk *pDisk = NULL;
if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) {
TAOS_CHECK_GOTO(TSDB_CODE_FS_TOO_MANY_MOUNT, &lino, _exit);
}
int32_t id = 0;
if (pTier->level == 0) {
if (pTier->disks[0] != NULL) {
id = pTier->ndisk;

View File

@ -7,8 +7,13 @@ target_link_libraries(
PUBLIC tfs
PUBLIC gtest_main
)
target_include_directories(
tfs_test
PUBLIC "${TD_SOURCE_DIR}/include/libs/tfs"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
# add_test(
# NAME tfs_test
# COMMAND tfs_test
# )
add_test(
NAME tfs_test
COMMAND tfs_test
)

View File

@ -13,6 +13,7 @@
#include "os.h"
#include "tfs.h"
#include "tfsInt.h"
class TfsTest : public ::testing::Test {
protected:
@ -280,6 +281,9 @@ TEST_F(TfsTest, 04_File) {
const STfsFile *pf2 = tfsReaddir(pDir);
EXPECT_EQ(pf2, nullptr);
pDir->pDir = taosOpenDir(fulldir);
EXPECT_NE(pDir->pDir, nullptr);
tfsClosedir(pDir);
}
@ -744,3 +748,116 @@ TEST_F(TfsTest, 05_MultiDisk) {
tfsClose(pTfs);
}
TEST_F(TfsTest, 06_Misc) {
// tfsDisk.c
STfsDisk *pDisk = NULL;
EXPECT_EQ(tfsNewDisk(0, 0, 0, NULL, &pDisk), TSDB_CODE_INVALID_PARA);
EXPECT_NE(tfsNewDisk(0, 0, 0, "", &pDisk), 0);
STfsDisk disk = {0};
EXPECT_EQ(tfsUpdateDiskSize(&disk), TSDB_CODE_INVALID_PARA);
// tfsTier.c
STfsTier tfsTier = {0};
EXPECT_EQ(taosThreadSpinInit(&tfsTier.lock, 0), 0);
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
tfsTier.ndisk = 3;
tfsTier.nAvailDisks = 1;
tfsTier.disks[1] = &disk;
disk.disable = 1;
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
disk.disable = 0;
disk.size.avail = 0;
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
tfsTier.ndisk = TFS_MAX_DISKS_PER_TIER;
SDiskCfg diskCfg = {0};
tstrncpy(diskCfg.dir, "testDataDir", TSDB_FILENAME_LEN);
EXPECT_EQ(tfsMountDiskToTier(&tfsTier, &diskCfg, 0), TSDB_CODE_FS_TOO_MANY_MOUNT);
EXPECT_EQ(taosThreadSpinDestroy(&tfsTier.lock), 0);
// tfs.c
STfs *pTfs = NULL;
EXPECT_EQ(tfsOpen(0, -1, &pTfs), TSDB_CODE_INVALID_PARA);
EXPECT_EQ(tfsOpen(0, 0, &pTfs), TSDB_CODE_INVALID_PARA);
EXPECT_EQ(tfsOpen(0, TFS_MAX_DISKS + 1, &pTfs), TSDB_CODE_INVALID_PARA);
taosMemoryFreeClear(pTfs);
STfs tfs = {0};
STfsTier *pTier = &tfs.tiers[0];
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, -1), false);
tfs.nlevel = 2;
pTier->ndisk = 3;
pTier->nAvailDisks = 1;
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false);
pTier->disks[0] = &disk;
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, -1, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, tfs.nlevel + 1, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, -1), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, pTier->ndisk), false);
EXPECT_EQ(tfsGetDisksAtLevel(&tfs, -1), 0);
EXPECT_EQ(tfsGetDisksAtLevel(&tfs, tfs.nlevel), 0);
EXPECT_EQ(tfsGetLevel(&tfs), tfs.nlevel);
for (int32_t l = 0; l < tfs.nlevel; ++l) {
EXPECT_EQ(taosThreadSpinInit(&tfs.tiers[l].lock, 0), 0);
}
SDiskID diskID = {0};
disk.size.avail = TFS_MIN_DISK_FREE_SIZE;
EXPECT_EQ(tfsAllocDisk(&tfs, tfs.nlevel, &diskID), 0);
tfs.nlevel = 0;
diskID.level = 0;
EXPECT_EQ(tfsAllocDisk(&tfs, 0, &diskID), 0);
tfs.nlevel = 2;
diskID.id = 10;
EXPECT_EQ(tfsMkdirAt(&tfs, NULL, diskID), TSDB_CODE_FS_INVLD_CFG);
EXPECT_NE(tfsMkdirRecurAt(&tfs, NULL, diskID), 0);
const char *rname = "";
EXPECT_EQ(tfsRmdir(&tfs, rname), 0);
EXPECT_EQ(tfsSearch(&tfs, -1, NULL), -1);
EXPECT_EQ(tfsSearch(&tfs, tfs.nlevel, NULL), -1);
diskCfg.level = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = TFS_MAX_TIERS;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = 0;
diskCfg.primary = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.primary = 2;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.primary = 1;
diskCfg.disable = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.disable = 2;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.disable = 0;
diskCfg.level = 1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = 0;
diskCfg.primary = 0;
tstrncpy(diskCfg.dir, "testDataDir1", TSDB_FILENAME_LEN);
EXPECT_NE(tfsCheckAndFormatCfg(&tfs, &diskCfg), 0);
TdFilePtr pFile = taosCreateFile("testDataDir1", TD_FILE_CREATE);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
EXPECT_EQ(taosCloseFile(&pFile), 0);
EXPECT_EQ(taosRemoveFile("testDataDir1"), 0);
for (int32_t l = 0; l < tfs.nlevel; ++l) {
EXPECT_EQ(taosThreadSpinDestroy(&tfs.tiers[l].lock), 0);
}
}

32
source/util/inc/tlogInt.h Normal file
View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_LOG_INT_H_
#define _TD_UTIL_LOG_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
void taosOpenNewSlowLogFile();
void taosLogObjSetToday(int64_t ts);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_LOG_INT_H_*/

View File

@ -483,7 +483,7 @@ static int32_t taosOpenNewLogFile() {
return 0;
}
static void taosOpenNewSlowLogFile() {
void taosOpenNewSlowLogFile() {
(void)taosThreadMutexLock(&tsLogObj.logMutex);
int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday;
if (delta >= 0 && delta < 86400) {
@ -539,6 +539,8 @@ void taosResetLog() {
}
}
void taosLogObjSetToday(int64_t ts) { tsLogObj.timestampToday = ts; }
static bool taosCheckFileIsOpen(char *logFileName) {
TdFilePtr pFile = taosOpenFile(logFileName, TD_FILE_WRITE);
if (pFile == NULL) {
@ -619,6 +621,7 @@ static void processLogFileName(const char *logName, int32_t maxFileNum) {
}
static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
int32_t code = 0, lino = 0;
#ifdef WINDOWS_STASH
/*
* always set maxFileNum to 1
@ -653,38 +656,27 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
// only an estimate for number of lines
int64_t filesize = 0;
if (taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL) != 0) {
(void)printf("\nfailed to fstat log file:%s, reason:%s\n", name, strerror(errno));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
TAOS_CHECK_EXIT(taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL));
tsLogObj.lines = (int32_t)(filesize / 60);
if (taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END) < 0) {
TAOS_UNUSED(printf("failed to seek to the end of log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
TAOS_CHECK_EXIT(terrno);
}
(void)snprintf(name, sizeof(name), "==================================================\n");
(void)snprintf(name, sizeof(name),
"==================================================\n"
" new log file\n"
"==================================================\n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)snprintf(name, sizeof(name), " new log file \n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)snprintf(name, sizeof(name), "==================================================\n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
TAOS_CHECK_EXIT(terrno);
}
_exit:
taosUnLockLogFile(tsLogObj.logHandle->pFile);
if (code != 0) {
TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code)));
}
return 0;
}

View File

@ -137,6 +137,10 @@ add_test(
NAME logTest
COMMAND logTest
)
target_include_directories(
logTest
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")

View File

@ -2,7 +2,9 @@
#include <stdlib.h>
#include <time.h>
#include <random>
#include <tdef.h>
#include <tlog.h>
#include <tlogInt.h>
#include <iostream>
using namespace std;
@ -44,3 +46,96 @@ TEST(log, check_log_refactor) {
}
taosCloseLog();
}
extern char *tsLogOutput;
TEST(log, misc) {
// taosInitLog
const char *path = TD_TMP_DIR_PATH "td";
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
EXPECT_EQ(taosInitLog("taoslog", 1, true), 0);
taosOpenNewSlowLogFile();
taosLogObjSetToday(INT64_MIN);
taosPrintSlowLog("slow log test");
// test taosInitLogOutput
const char *pLogName = NULL;
tsLogOutput = (char *)taosMemCalloc(1, TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "stdout", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "stderr", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "/dev/null", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tsLogOutput[0] = '#';
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "/", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "\\", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "testLogOutput", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "testLogOutputDir/testLogOutput", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, ".", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "/..", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tsLogOutput[0] = 0;
// test taosAssertDebug
tsAssert = false;
taosAssertDebug(true, __FILE__, __LINE__, 0, "test_assert_true_without_core");
taosAssertDebug(false, __FILE__, __LINE__, 0, "test_assert_false_with_core");
tsAssert = true;
// test taosLogCrashInfo, taosReadCrashInfo and taosReleaseCrashLogFile
char nodeType[16] = "nodeType";
char *pCrashMsg = (char *)taosMemoryCalloc(1, 16);
EXPECT_NE(pCrashMsg, nullptr);
tstrncpy(pCrashMsg, "crashMsg", 16);
#if !defined(_TD_DARWIN_64) && !defined(WINDOWS)
pid_t pid = taosGetPId();
EXPECT_EQ(pid > 0, true);
siginfo_t sigInfo = {0};
sigInfo.si_pid = pid;
taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, &sigInfo);
#else
taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, nullptr);
#endif
char crashInfo[PATH_MAX] = {0};
snprintf(crashInfo, sizeof(crashInfo), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType);
char *pReadMsg = NULL;
int64_t readMsgLen = 0;
TdFilePtr pFile = NULL;
taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile);
EXPECT_NE(pReadMsg, nullptr);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(strncasecmp(pReadMsg, "crashMsg", strlen("crashMsg")), 0);
EXPECT_EQ(taosCloseFile(&pFile), 0);
taosMemoryFreeClear(pReadMsg);
pFile = taosOpenFile(crashInfo, TD_FILE_WRITE);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(taosWriteFile(pFile, "00000", 1), 1);
EXPECT_EQ(taosCloseFile(&pFile), 0);
taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile);
EXPECT_EQ(pReadMsg, nullptr);
EXPECT_EQ(pFile, nullptr);
pFile = taosOpenFile(crashInfo, TD_FILE_WRITE);
EXPECT_NE(pFile, nullptr);
taosReleaseCrashLogFile(pFile, true);
// clean up
taosRemoveDir(path);
taosCloseLog();
}