diff --git a/docs/zh/08-operation/09-backup.md b/docs/zh/08-operation/09-backup.md index 075cc244f4..aa4f9f61a0 100644 --- a/docs/zh/08-operation/09-backup.md +++ b/docs/zh/08-operation/09-backup.md @@ -35,119 +35,67 @@ taosdump -i /file/path -h localhost -P 6030 # 2. 基于 TDengine Enterprise 进行数据备份恢复 -## 2.1. 概念 +## 2.1. 概述 基于 TDengine 的数据订阅功能,TDengine Enterprise 实现了数据的增量备份和恢复。用户可以通过 taosExplorer 对 TDengine 集群进行备份和恢复。 TDengine Enterprise 的备份和恢复功能包括以下几个概念: -1. 备份对象:用户可以对一个数据库,或者一个超级表进行备份。 -2. 备份计划:用户可以为某个备份对象创建一个备份计划。备份计划从指定的时间点开始,周期性的执行一次备份任务,并生成一组备份文件。 -3. 备份点:每执行一次备份任务,生成一组备份文件,它们对应一个时间点,称为**备份点**。第一个备份点称为**初始备份点**。 -4. 备份文件:多个备份点,组成备份计划的备份文件。 -5. 恢复任务:用户可以选择备份计划的某个备份点,创建一个恢复任务。恢复任务会从初始备份点开始,逐个应用备份点,恢复到指定的备份点。 +1. 增量数据备份:基于 TDengine 的数据订阅功能,将**备份对象**的所有数据变更(包括:新增、修改、删除、元数据变更等)记录下来,生成备份文件。 +2. 数据恢复:使用增量数据备份生成的备份文件,将**备份对象**恢复到指定的时间点。 +3. 备份对象:用户备份的对象,可以是一个**数据库**,也可以是一个**超级表**。 +4. 备份计划:用户为备份对象创建一个周期性执行的备份任务。备份计划从指定的时间点开始,以**备份周期**为间隔,周期性地执行备份任务。备份任务每次生成一个**备份点**。 +5. 备份点:每次执行备份任务,生成一组备份文件,它们对应一个时间点,称为**备份点**。第一个备份点称为**初始备份点**。 +6. 恢复任务:用户选择备份计划的某个备份点,创建一个恢复任务。恢复任务从**初始备份点**开始,逐个回放**备份文件**中的数据变更,直到指定的备份点结束。 ![backup-zh-00.png](./pic/backup-00-concept.png "数据备份和恢复") -以上面的图为例: +以上图为例: -1. 用户创建了一个备份计划,从 2024-08-27 00:00:00 开始,每隔 1 天执行一次备份任务。 -2. 在 2024-08-27 00:00:00 执行了第一次备份任务,生成了一个备份点。 -3. 之后,每隔 1 天执行一次备份任务,生成了多个备份点,组成了备份文件。 -4. 用户可以选择某个备份点,创建一个恢复任务,恢复到指定的备份点。 +1. 用户创建了一个**备份计划**,从 2024-08-27 00:00:00 开始,每隔 1 天执行一次**备份任务**。 +2. 在 2024-08-27 00:00:00 执行了第一次备份任务,生成了一个**初始备份点**。 +3. 之后,每隔 1 天执行一次备份任务,生成了多个**备份点**。 +4. 用户可以选择某个**备份点**,创建一个**恢复任务**。 5. 恢复任务会从初始备份点开始,逐个应用备份点,恢复到指定的备份点。 -## 2.2. 备份计划 +## 2.2. 数据备份 -### 2.1.1. 创建 - -1. 通过浏览器访问 taosExplorer 服务,访问地址通常为 TDengine 集群所在 IP 地址的端口 6060,如 http://localhost:6060。 -2. 在 taosExplorer 服务页面中,进入“系统管理 - 备份”页面,点击“创建备份计划”按钮。 - -![backup-zh-01.png](./pic/backup-01-create.png "创建备份计划") - -3. 在弹出的“创建备份计划”表单中,填写备份计划的相关信息。 - -![backup-zh-02.png](./pic/backup-02-form.png "填写备份计划信息") +通过浏览器访问 taosExplorer 服务,访问地址通常为 TDengine 集群所在 IP 地址的端口 6060,如 http://localhost:6060。 在 +taosExplorer 服务页面中,进入“系统管理 - 备份”页面,在“备份计划”标签页下,点击“创建备份计划”,填写备份计划的相关信息。 需要填写的信息包括: -* 数据库:需要备份的数据库名称。一个备份计划只能备份一个数据库/超级表。 -* 超级表:需要备份的超级表名称。如果不填写,则备份整个数据库。 -* 下次执行时间:首次执行备份任务的日期时间。 -* 备份周期:备份点之间的时间间隔。注意:备份周期必须大于数据库的 WAL_RETENTION_PERIOD 参数值。 -* 错误重试次数:对于可通过重试解决的错误,系统会按照此次数进行重试。 -* 错误重试间隔:每次重试之间的时间间隔。 -* 目录:存储备份文件的目录。 -* 备份文件大小:备份文件的大小限制。当备份文件大小达到此限制时,会自动创建新的备份文件。 -* 文件压缩等级:备份文件的压缩等级。支持:最快速度、最佳压缩比、兼具速度和压缩比。 +1. 数据库:需要备份的数据库名称。一个备份计划只能备份一个数据库/超级表。 +2. 超级表:需要备份的超级表名称。如果不填写,则备份整个数据库。 +3. 下次执行时间:首次执行备份任务的日期时间。 +4. 备份周期:备份点之间的时间间隔。注意:备份周期必须大于数据库的 WAL_RETENTION_PERIOD 参数值。 +5. 错误重试次数:对于可通过重试解决的错误,系统会按照此次数进行重试。 +6. 错误重试间隔:每次重试之间的时间间隔。 +7. 目录:存储备份文件的目录。 +8. 备份文件大小:备份文件的大小限制。当备份文件大小达到此限制时,会自动创建新的备份文件。 +9. 文件压缩等级:备份文件的压缩等级。支持:最快速度、最佳压缩比、兼具速度和压缩比。 -创建成功后,备份计划会开始按照配置的参数运行。 +创建成功后,备份计划会开始按照配置的参数运行。在“备份计划”下的列表中,可以查看已创建的备份计划。 -### 2.1.2. 查看 +备份计划支持以下操作: -在“备份计划”下的列表中,可以查看已创建的备份计划。 +1. 查看:显示备份计划的详细信息。 +2. 修改:修改备份计划的配置。修改备份计划的配置后,当前运行的备份任务会先停止,然后按照新的配置重新运行。 +3. 复制:以选中的备份计划为模版,创建新的备份计划。除了数据库和超级表需要用户选择以外,其他配置项和被复制的计划相同。 +4. 删除:删除备份计划。删除备份计划时,可以选择是否删除关联的备份文件。 +5. 指标:查看备份计划的统计指标。 +6. 查看备份点:查看和备份计划关联的所有备份点。 -![backup-zh-03.png](./pic/backup-03-list.png "查看备份计划列表") +## 2.3. 备份文件 -点击“操作”中的“查看”按钮,可以查看备份计划的详细信息。 +在“备份文件”列表中,可以查看备份文件的详细信息。 -![backup-zh-04.png](./pic/backup-04-view.png "查看备份计划详情") +## 2.4. 数据恢复 -### 2.1.3. 修改 +在“备份文件”列表中,选择一个备份点,可以创建一个恢复任务,数据库恢复到指定的时间。 -点击“操作”中的“修改”按钮,可以修改备份计划的配置。 - -![backup-zh-05.png](./pic/backup-05-edit.png "修改备份计划") - -修改备份计划的配置后,当前运行的备份任务会先停止,然后按照新的配置重新运行。 - -### 2.1.4. 复制 - -点击“操作”中的“复制”按钮,可以复制备份计划。 - -![backup-zh-06.png](./pic/backup-06-copy.png "复制备份计划") - -除了数据库和超级表被置为空外,其他配置项和被复制的计划相同。用户点击“确认”后,创建一个新的备份计划。 - -### 2.1.5. 删除 - -在操作中点击关闭按钮,可以停止当前备份计划。点击“操作”中的“删除”按钮,可以删除备份计划。 - -![backup-zh-07.png](./pic/backup-07-del.png "删除备份计划") - -删除备份计划时,可以选择,是否删除关联的备份文件。 - -## 2.2. 备份文件 - -### 2.2.1. 查看 - -在备份计划列表中,选择要一个备份计划。在“备份文件”列中,点击“查看”按钮。可以查看和备份计划的所有备份点。 - -![backup-zh-08.png](./pic/backup-08-files.png "查看备份文件") - -在备份文件列表中,可以查看备份文件的详细信息。 - -![backup-zh-09.png](./pic/backup-09-filelist.png "查看备份文件列表") - -## 2.3. 恢复任务 - -### 2.3.1. 创建 - -在备份文件列表中,点击“操作”中的“恢复”按钮,可以创建一个恢复任务。 - -![backup-zh-10.png](./pic/backup-10-restore-create.png "创建恢复任务") - -在弹出的对话框中,选择使用哪个备份点开始恢复,默认为最早的备份点。点击“确定”后,创建恢复任务,并跳转至“恢复任务”列表。 - -### 2.3.2. 查看 - -在“恢复任务”列表中,可以查看已创建的恢复任务。 - -![backup-zh-11.png](./pic/backup-11-restore-list.png "查看恢复任务列表") - -恢复任务可以终止。点击“操作”中的开关,可以终止当前恢复任务。 +在“恢复任务”列表中,可以查看已创建的恢复任务。恢复任务可以终止。 # 3. 常见错误排查 diff --git a/docs/zh/08-operation/pic/backup-00-concept.png b/docs/zh/08-operation/pic/backup-00-concept.png index 5123b4d540..ce65c3f181 100644 Binary files a/docs/zh/08-operation/pic/backup-00-concept.png and b/docs/zh/08-operation/pic/backup-00-concept.png differ diff --git a/docs/zh/08-operation/pic/backup-01-create.png b/docs/zh/08-operation/pic/backup-01-create.png deleted file mode 100644 index a424c276e5..0000000000 Binary files a/docs/zh/08-operation/pic/backup-01-create.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-02-form.png b/docs/zh/08-operation/pic/backup-02-form.png deleted file mode 100644 index 3ccd81c831..0000000000 Binary files a/docs/zh/08-operation/pic/backup-02-form.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-03-list.png b/docs/zh/08-operation/pic/backup-03-list.png deleted file mode 100644 index 505d6a8040..0000000000 Binary files a/docs/zh/08-operation/pic/backup-03-list.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-04-view.png b/docs/zh/08-operation/pic/backup-04-view.png deleted file mode 100644 index 7bfa699906..0000000000 Binary files a/docs/zh/08-operation/pic/backup-04-view.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-05-edit.png b/docs/zh/08-operation/pic/backup-05-edit.png deleted file mode 100644 index 5ff1204ad5..0000000000 Binary files a/docs/zh/08-operation/pic/backup-05-edit.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-06-copy.png b/docs/zh/08-operation/pic/backup-06-copy.png deleted file mode 100644 index 2ec1ea68d0..0000000000 Binary files a/docs/zh/08-operation/pic/backup-06-copy.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-07-del.png b/docs/zh/08-operation/pic/backup-07-del.png deleted file mode 100644 index e1cf4748bf..0000000000 Binary files a/docs/zh/08-operation/pic/backup-07-del.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-08-files.png b/docs/zh/08-operation/pic/backup-08-files.png deleted file mode 100644 index 07f2184d4f..0000000000 Binary files a/docs/zh/08-operation/pic/backup-08-files.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-09-filelist.png b/docs/zh/08-operation/pic/backup-09-filelist.png deleted file mode 100644 index b963091f36..0000000000 Binary files a/docs/zh/08-operation/pic/backup-09-filelist.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-10-restore-create.png b/docs/zh/08-operation/pic/backup-10-restore-create.png deleted file mode 100644 index e0e22160d0..0000000000 Binary files a/docs/zh/08-operation/pic/backup-10-restore-create.png and /dev/null differ diff --git a/docs/zh/08-operation/pic/backup-11-restore-list.png b/docs/zh/08-operation/pic/backup-11-restore-list.png deleted file mode 100644 index ca1f1b45d5..0000000000 Binary files a/docs/zh/08-operation/pic/backup-11-restore-list.png and /dev/null differ diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index a2be706dc0..7a4401827c 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -389,7 +389,6 @@ typedef struct SStateStore { int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - void (*streamStateSetFillInfo)(SStreamState* pState); void (*streamStateClearExpiredState)(SStreamState* pState); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, @@ -455,7 +454,6 @@ typedef struct SStateStore { int32_t (*streamStateBegin)(SStreamState* pState); void (*streamStateCommit)(SStreamState* pState); void (*streamStateDestroy)(SStreamState* pState, bool remove); - int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst); } SStateStore; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 2179547352..b4e0087b1a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); void streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState, bool remove); -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); int32_t streamStateDelTaskDb(SStreamState* pState); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); @@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); // twa -void streamStateSetFillInfo(SStreamState* pState); void streamStateClearExpiredState(SStreamState* pState); void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f1f5b00e38..f47c308e18 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index a6a3c63a50..709d053414 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -148,7 +148,7 @@ int32_t tfsMkdirRecur(STfs *pTfs, const char *rname); * @return int32_t 0 for success, -1 for failure. */ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId); - +#if 0 /** * @brief check directories exist in tfs. * @@ -158,7 +158,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId); * @return true for exist, false for not exist. */ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId); - +#endif /** * @brief Remove directory at all levels in tfs. * @@ -241,7 +241,7 @@ void tfsBasename(const STfsFile *pFile, char *dest); * @param dest The buffer where dirname will be saved. */ void tfsDirname(const STfsFile *pFile, char *dest); - +#if 0 /** * @brief Get the absolute file name of rname. * @@ -251,7 +251,7 @@ void tfsDirname(const STfsFile *pFile, char *dest); * @param aname absolute file name */ void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname); - +#endif /** * @brief Remove file in tfs. * diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 35ab328815..68d75f9897 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -66,9 +66,9 @@ int32_t s3Begin() { void s3End() { S3_deinitialize(); } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } - +#if 0 static int32_t s3ListBucket(char const *bucketname); - +#endif static void s3DumpCfgByEp(int8_t epIndex) { // clang-format off (void)fprintf(stdout, @@ -291,7 +291,7 @@ static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) { TAOS_RETURN(code); } - +#if 0 static int32_t s3ListBucket(char const *bucketname) { int32_t code = 0; @@ -312,7 +312,7 @@ static int32_t s3ListBucket(char const *bucketname) { TAOS_RETURN(code); } - +#endif typedef struct growbuffer { // The total number of bytes, and the start byte int size; diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index bb12612273..31afb7377e 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -40,6 +40,46 @@ add_test( COMMAND dataformatTest ) +# cosCpTest.cpp +add_executable(cosCpTest "") +target_sources( + cosCpTest + PRIVATE + "cosCpTest.cpp" +) +target_link_libraries(cosCpTest gtest gtest_main util common) +target_include_directories( + cosCpTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosCpTest + COMMAND cosCpTest +) + +if(TD_LINUX) + +# cosTest.cpp +add_executable(cosTest "") +target_sources( + cosTest + PRIVATE + "cosTest.cpp" +) +target_link_libraries(cosTest gtest gtest_main util common) +target_include_directories( + cosTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosTest + COMMAND cosTest +) + +endif() + if (${TD_LINUX}) # tmsg test add_executable(tmsgTest "") @@ -60,4 +100,4 @@ if (${TD_LINUX}) add_custom_command(TARGET tmsgTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${MSG_TBL_FILE} $ ) -endif () \ No newline at end of file +endif () diff --git a/source/common/test/cosCpTest.cpp b/source/common/test/cosCpTest.cpp new file mode 100644 index 0000000000..fc16daa8cc --- /dev/null +++ b/source/common/test/cosCpTest.cpp @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +TEST(testCase, cpOpenCloseRemove) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuild) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoad) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 0); + EXPECT_EQ(cp.parts[0].etag, std::string("")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuildUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + int seq = 1; + char *etags[1] = {"etags-1-xxx"}; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + cos_cp_update(&cp, cp.parts[seq - 1].index, etags[seq - 1], 0); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoadUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 1); + EXPECT_EQ(cp.parts[0].etag, std::string("etags-1-xxx")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} diff --git a/source/common/test/cosTest.cpp b/source/common/test/cosTest.cpp new file mode 100644 index 0000000000..5a6aee52d9 --- /dev/null +++ b/source/common/test/cosTest.cpp @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +int32_t cosInitEnv() { + int32_t code = 0; + bool isBlob = false; + + extern int8_t tsS3Ablob; + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + extern char tsS3BucketName[TSDB_FQDN_LEN]; + + tsS3Ablob = isBlob; + /* + const char *hostname = "endpoint/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + */ + + // const char *hostname = "http://192.168.1.52:9000"; + // const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + // const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + // const char *bucketName = "test-bucket"; + const char *hostname = "192.168.1.52:9000"; + const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + const char *bucketName = "ci-bucket19"; + + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + + // setup s3 env + extern int8_t tsS3EpNum; + extern int8_t tsS3Https[TSDB_MAX_EP_NUM]; + + tsS3EpNum = 1; + tsS3Https[0] = false; + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +TEST(testCase, cosCpPutError) { + int32_t code = 0, lino = 0; + + char const *objectName = "testObject"; + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), -1); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3EvictCache("", 0); + + s3End(); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cosCpPut) { + int32_t code = 0, lino = 0; + + int8_t with_cp = 0; + char *data = nullptr; + + const long objectSize = 65 * 1024 * 1024; + char const *objectName = "cosut.bin"; + const char object_name[] = "cosut.bin"; + + EXPECT_EQ(std::string(object_name), objectName); + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + + { + data = (char *)taosMemoryCalloc(1, objectSize); + if (!data) { + TAOS_CHECK_EXIT(terrno); + } + + for (int i = 0; i < objectSize / 2; ++i) { + data[i * 2 + 1] = 1; + } + + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, objectSize); + GTEST_ASSERT_EQ(n, objectSize); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + + with_cp = 1; + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), objectSize); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3End(); + s3EvictCache("", 0); + + taosMemoryFree(data); + + EXPECT_EQ(taosRemoveFile(path), TSDB_CODE_SUCCESS); + } + + return; + +_exit: + if (data) { + taosMemoryFree(data); + s3End(); + } + + std::cout << "code: " << code << std::endl; +} diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 4fe4333534..68dc981338 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index df6fb17730..41e6c6c2c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp index 33ac774d0c..ba2ac14551 100644 --- a/source/libs/azure/src/td_block_blob_client.cpp +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -38,6 +38,8 @@ TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::strin return newClient; } +TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} +#if 0 TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, const BlobClientOptions& options) : BlobClient(blobUrl, std::move(credential), options) {} @@ -50,8 +52,6 @@ TDBlockBlobClient::TDBlockBlobClient(const std::string& TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options) : BlobClient(blobUrl, options) {} -TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} - TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const { TDBlockBlobClient newClient(*this); if (snapshot.empty()) { @@ -74,47 +74,6 @@ TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId) return newClient; } -Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, - const UploadBlockBlobOptions& options, - const Azure::Core::Context& context) const { - _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; - if (options.TransactionalContentHash.HasValue()) { - if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { - protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; - } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { - protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; - } - } - protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; - protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; - protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; - protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; - protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; - protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; - protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); - protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); - protocolLayerOptions.Tier = options.AccessTier; - protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; - protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; - protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; - protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; - protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; - protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; - if (m_customerProvidedKey.HasValue()) { - protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; - protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; - protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); - } - protocolLayerOptions.EncryptionScope = m_encryptionScope; - if (options.ImmutabilityPolicy.HasValue()) { - protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; - protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; - } - protocolLayerOptions.LegalHold = options.HasLegalHold; - - return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); -} - Azure::Response TDBlockBlobClient::UploadFrom( const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options, const Azure::Core::Context& context) const { @@ -270,6 +229,47 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } +#endif +Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, + const UploadBlockBlobOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (options.ImmutabilityPolicy.HasValue()) { + protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; + protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; + } + protocolLayerOptions.LegalHold = options.HasLegalHold; + + return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); +} Azure::Response TDBlockBlobClient::UploadFrom( const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options, @@ -349,7 +349,7 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } - +#if 0 Azure::Response TDBlockBlobClient::UploadFromUri( const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options, const Azure::Core::Context& context) const { @@ -396,7 +396,7 @@ Azure::Response TDBlockBlobClient::UploadF return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::StageBlock(const std::string& blockId, Azure::Core::IO::BodyStream& content, const StageBlockOptions& options, @@ -419,7 +419,7 @@ Azure::Response TDBlockBlobClient::StageBlock(const st protocolLayerOptions.EncryptionScope = m_encryptionScope; return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::StageBlockFromUri( const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options, const Azure::Core::Context& context) const { @@ -457,7 +457,7 @@ Azure::Response TDBlockBlobClient::StageBlockFr return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::CommitBlockList( const std::vector& blockIds, const CommitBlockListOptions& options, const Azure::Core::Context& context) const { @@ -492,7 +492,7 @@ Azure::Response TDBlockBlobClient::CommitBlockLis return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options, const Azure::Core::Context& context) const { _detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions; @@ -502,6 +502,7 @@ Azure::Response TDBlockBlobClient::GetBlockList(cons return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, _internal::WithReplicaStatus(context)); } +#endif /* Azure::Response TDBlockBlobClient::Query(const std::string& querySqlExpression, const QueryBlobOptions& options, diff --git a/source/libs/azure/test/azExceptionTest.cpp b/source/libs/azure/test/azExceptionTest.cpp new file mode 100644 index 0000000000..a83bb4d8f2 --- /dev/null +++ b/source/libs/azure/test/azExceptionTest.cpp @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +// clang-format off +#include "td_block_blob_client.hpp" +#include "az.h" +// clang-format on + +using namespace Azure::Storage; +using namespace Azure::Storage::Blobs; + +extern int8_t tsS3Enabled; +extern char tsS3BucketName[TSDB_FQDN_LEN]; + +static int32_t azInitEnv() { + int32_t code = 0; + + extern int8_t tsS3EpNum; + + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + + /* TCS parameter format + tsS3Hostname[0] = "/.blob.core.windows.net"; + tsS3AccessKeyId[0] = ""; + tsS3AccessKeySecret[0] = ""; + tsS3BucketName = ""; + */ + + const char *hostname = "/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + + if (hostname[0] != '<') { + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + } else { + const char *accountId = getenv("ablob_account_id"); + if (!accountId) { + return -1; + } + + const char *accountSecret = getenv("ablob_account_secret"); + if (!accountSecret) { + return -1; + } + + const char *containerName = getenv("ablob_container"); + if (!containerName) { + return -1; + } + + TAOS_STRCPY(&tsS3Hostname[0][0], accountId); + TAOS_STRCAT(&tsS3Hostname[0][0], ".blob.core.windows.net"); + TAOS_STRCPY(&tsS3AccessKeyId[0][0], accountId); + TAOS_STRCPY(&tsS3AccessKeySecret[0][0], accountSecret); + TAOS_STRCPY(tsS3BucketName, containerName); + } + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +// TEST(AzTest, DISABLED_InterfaceTest) { +TEST(AzETest, InterfaceTest) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 4096; + char data[size] = {0}; + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + TDBlockBlobClient blobClient = + TDBlockBlobClient::CreateFromConnectionString(std::getenv("ablob_cs"), std::string(tsS3BucketName), object_name); + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + azEnd(); +} diff --git a/source/libs/azure/test/azTest.cpp b/source/libs/azure/test/azTest.cpp index 0459cb5f6a..b57b7ca884 100644 --- a/source/libs/azure/test/azTest.cpp +++ b/source/libs/azure/test/azTest.cpp @@ -199,3 +199,132 @@ TEST(AzTest, InterfaceTest) { azEnd(); } + +// TEST(AzTest, DISABLED_InterfaceTestBig) { +TEST(AzTest, InterfaceTestBig) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 256 * 1024 * 1024 + 1; + char *data = (char *)taosMemoryCalloc(1, size); + if (!data) { + std::cout << "code: " << code << "terrno: " << terrno << std::endl; + + return; + } + + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + taosMemoryFree(data); + + azEnd(); +} diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bc1462176a..2e16b0968a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1986,10 +1986,19 @@ void catalogDestroy(void) { } if (gCtgMgmt.cacheTimer) { - if (taosTmrStop(gCtgMgmt.cacheTimer)) { - qTrace("stop catalog cache timer may failed"); + if (!taosTmrStop(gCtgMgmt.cacheTimer)) { +/* + qDebug("catalog cacheTimer %" PRIuPTR " not stopped", (uintptr_t)gCtgMgmt.cacheTimer); + + while (!taosTmrIsStopped(&gCtgMgmt.cacheTimer)) { + taosMsleep(1); + } +*/ } + + qDebug("catalog cacheTimer %" PRIuPTR " is stopped", (uintptr_t)gCtgMgmt.cacheTimer); gCtgMgmt.cacheTimer = NULL; + taosTmrCleanUp(gCtgMgmt.timer); gCtgMgmt.timer = NULL; } diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index f3a0b04457..a1512f8fd9 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -480,6 +480,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) { dbCache = (SCtgDBCache *)pIter; + CTG_LOCK(CTG_READ, &dbCache->dbLock); + dbFName = taosHashGetKey(pIter, &len); int32_t metaNum = dbCache->tbCache ? taosHashGetSize(dbCache->tbCache) : 0; @@ -509,6 +511,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) { hashMethod, hashPrefix, hashSuffix, vgNum); if (dbCache->vgCache.vgInfo) { + CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock); + int32_t i = 0; void *pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, NULL); while (pVgIter) { @@ -524,6 +528,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) { pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, pVgIter); } + + CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); } if (dbCache->cfgCache.cfgInfo) { @@ -544,6 +550,8 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) { pCfg->schemaless, pCfg->sstTrigger); } + CTG_UNLOCK(CTG_READ, &dbCache->dbLock); + ++i; pIter = taosHashIterate(dbHash, pIter); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index c2889f096b..06eabc09da 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -149,12 +149,13 @@ void ctgTestInitLogFile() { return; } - const char *defaultLogFileNamePrefix = "taoslog"; + const char *defaultLogFileNamePrefix = "catalogTest"; const int32_t maxLogFileNum = 10; tsAsyncLog = 0; qDebugFlag = 159; tmrDebugFlag = 159; + tsNumOfLogLines = 1000000000; TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH); (void)ctgdEnableDebug("api", true); @@ -1839,7 +1840,7 @@ TEST(tableMeta, updateStbMeta) { while (true) { uint64_t n = 0; ASSERT(0 == ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n)); - if (n != 3) { + if (n < 3) { taosMsleep(50); } else { break; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7af64c041d..f922a5e03e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); int32_t uploadCheckpointToS3(const char* id, const char* path); +int32_t deleteCheckpointFile(const char* id, const char* name); +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ebde9fe50e..d3eba382c9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,6 @@ #include "tcs.h" static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); #ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); @@ -230,8 +229,8 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream return code; } -static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, - int32_t transId) { +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId) { int32_t code = 0; int32_t vgId = pTask->pMeta->vgId; int32_t taskLevel = pTask->info.taskLevel; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 89f0ea9e1f..7259c0e49a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) { taosMemoryFreeClear(pState); } -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { - return deleteExpiredCheckPoint(pState->pFileState, mark); -} - void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { @@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); } -void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); } - int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 05edad0f5f..aaff58d1b4 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe } } -int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); - int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); - if (pFileState->searchBuff != NULL) { - deleteHashSortRowBuff(pFileState, pKey); - } - if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { - return TSDB_CODE_SUCCESS; - } - return TSDB_CODE_FAILED; -} - static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { - return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list); -} - int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; int64_t maxCheckPointId = 0; @@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { return pFileState->pGroupIdMap; } -void setFillInfo(SStreamFileState* pFileState) { - pFileState->hasFillCatch = false; -} - void clearExpiredState(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1261,6 +1241,7 @@ _end: } } +#ifdef BUILD_NO_CALL int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -1328,6 +1309,7 @@ _end: } return code; } +#endif int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode) { diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index c8297d56b7..c993743dc3 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) extern int8_t tsS3EpNum; tsS3EpNum = 1; - code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); - EXPECT_EQ(code, TSDB_CODE_SUCCESS); + code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream"); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); code = downloadCheckpointByNameS3("123", "/root/download", ""); EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); + + code = deleteCheckpointFile("aaa123", "bbb"); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); } + +TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.checkpointId = 123; + code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + pTask->chkInfo.pActiveInfo->failedId = 223; + code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + SStreamDataBlock block; + block.srcTaskId = 456; + SStreamTask upTask; + upTask = *pTask; + upTask.id.taskId = 456; + streamTaskSetUpstreamInfo(pTask, &upTask); + pTask->chkInfo.pActiveInfo->failedId = 23; + code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + streamTaskSetUpstreamInfo(pTask, &upTask); + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 223; + + STaskCheckpointReadyInfo readyInfo; + readyInfo.upstreamTaskId = 4567; + block.srcTaskId = 4567; + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1); + + initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1); + taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo); + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 1111; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); +} \ No newline at end of file diff --git a/source/libs/tdb/src/db/tdbUtil.c b/source/libs/tdb/src/db/tdbUtil.c index 2249123ef5..14845ed8ee 100644 --- a/source/libs/tdb/src/db/tdbUtil.c +++ b/source/libs/tdb/src/db/tdbUtil.c @@ -66,3 +66,10 @@ int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) { *size = szBytes / szPage; return 0; } + +void tdbCloseDir(TdDirPtr *ppDir) { + int32_t ret = taosCloseDir(ppDir); + if (ret) { + tdbError("failed to close directory, reason:%s", tstrerror(ret)); + } +} diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 993c3ffab8..cd2d4ce57a 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -71,12 +71,7 @@ typedef TdFilePtr tdb_fd_t; #define tdbGetDirEntryName taosGetDirEntryName #define tdbDirEntryBaseName taosDirEntryBaseName -static FORCE_INLINE void tdbCloseDir(TdDirPtr *ppDir) { - int32_t ret = taosCloseDir(ppDir); - if (ret) { - tdbError("failed to close directory, reason:%s", tstrerror(ret)); - } -} +void tdbCloseDir(TdDirPtr *ppDir); #define tdbOsRemove remove #define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL) diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 3c2b67da01..5dd9ce568f 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -16,6 +16,10 @@ #ifndef _TD_TFS_INT_H_ #define _TD_TFS_INT_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "os.h" #include "taosdef.h" @@ -74,6 +78,7 @@ typedef struct STfs { SHashObj *hash; // name to did map } STfs; +int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg); int32_t tfsNewDisk(int32_t level, int32_t id, int8_t disable, const char *dir, STfsDisk **ppDisk); STfsDisk *tfsFreeDisk(STfsDisk *pDisk); int32_t tfsUpdateDiskSize(STfsDisk *pDisk); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index ecc55517b3..5021a6ae39 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -19,7 +19,6 @@ static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg); static int32_t tfsCheck(STfs *pTfs); -static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg); static int32_t tfsFormatDir(char *idir, char *odir); static int32_t tfsGetDiskByName(STfs *pTfs, const char *dir, STfsDisk **ppDisk); static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir); @@ -245,13 +244,13 @@ void tfsDirname(const STfsFile *pFile, char *dest) { tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN); tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN); } - +#if 0 void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) { STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); (void)snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); } - +#endif int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) { @@ -340,7 +339,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { TAOS_RETURN(0); } - +#if 0 bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) { STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); char aname[TMPNAME_LEN]; @@ -348,7 +347,7 @@ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) { (void)snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); return taosDirExist(aname); } - +#endif int32_t tfsRmdir(STfs *pTfs, const char *rname) { if (rname[0] == 0) { TAOS_RETURN(0); @@ -515,7 +514,7 @@ _exit: TAOS_RETURN(code); } -static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { +int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { int32_t code = 0; char dirName[TSDB_FILENAME_LEN] = "\0"; @@ -577,32 +576,32 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { } static int32_t tfsFormatDir(char *idir, char *odir) { + int32_t code = 0, lino = 0; wordexp_t wep = {0}; + int32_t dirLen = 0; + char tmp[PATH_MAX] = {0}; - int32_t code = wordexp(idir, &wep, 0); + code = wordexp(idir, &wep, 0); if (code != 0) { - TAOS_RETURN(TAOS_SYSTEM_ERROR(code)); + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(code)); } - char tmp[PATH_MAX] = {0}; - if (taosRealPath(wep.we_wordv[0], tmp, PATH_MAX) != 0) { - code = TAOS_SYSTEM_ERROR(errno); - wordfree(&wep); - TAOS_RETURN(code); - } + TAOS_CHECK_EXIT(taosRealPath(wep.we_wordv[0], tmp, PATH_MAX)); - int32_t dirLen = strlen(tmp); + dirLen = strlen(tmp); if (dirLen < 0 || dirLen >= TSDB_FILENAME_LEN) { - wordfree(&wep); - code = TSDB_CODE_OUT_OF_RANGE; - fError("failed to mount %s to FS since %s, real path:%s, len:%d", idir, tstrerror(code), tmp, dirLen); - TAOS_RETURN(code); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_RANGE); } tstrncpy(odir, tmp, TSDB_FILENAME_LEN); +_exit: wordfree(&wep); - TAOS_RETURN(0); + if (code != 0) { + fError("failed to mount %s to FS at line %d since %s, real path:%s, len:%d", idir, lino, tstrerror(code), tmp, + dirLen); + } + TAOS_RETURN(code); } static int32_t tfsCheck(STfs *pTfs) { diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 2cfcdc6d0a..acc8168538 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -41,13 +41,13 @@ void tfsDestroyTier(STfsTier *pTier) { int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk) { int32_t code = 0; int32_t lino = 0; + int32_t id = 0; STfsDisk *pDisk = NULL; if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) { TAOS_CHECK_GOTO(TSDB_CODE_FS_TOO_MANY_MOUNT, &lino, _exit); } - int32_t id = 0; if (pTier->level == 0) { if (pTier->disks[0] != NULL) { id = pTier->ndisk; diff --git a/source/libs/tfs/test/CMakeLists.txt b/source/libs/tfs/test/CMakeLists.txt index 2fd0836a1d..050811f0f5 100644 --- a/source/libs/tfs/test/CMakeLists.txt +++ b/source/libs/tfs/test/CMakeLists.txt @@ -7,8 +7,13 @@ target_link_libraries( PUBLIC tfs PUBLIC gtest_main ) +target_include_directories( + tfs_test + PUBLIC "${TD_SOURCE_DIR}/include/libs/tfs" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) -# add_test( -# NAME tfs_test -# COMMAND tfs_test -# ) +add_test( + NAME tfs_test + COMMAND tfs_test +) diff --git a/source/libs/tfs/test/tfsTest.cpp b/source/libs/tfs/test/tfsTest.cpp index bb89fbe69f..1570cf173f 100644 --- a/source/libs/tfs/test/tfsTest.cpp +++ b/source/libs/tfs/test/tfsTest.cpp @@ -13,6 +13,7 @@ #include "os.h" #include "tfs.h" +#include "tfsInt.h" class TfsTest : public ::testing::Test { protected: @@ -280,6 +281,9 @@ TEST_F(TfsTest, 04_File) { const STfsFile *pf2 = tfsReaddir(pDir); EXPECT_EQ(pf2, nullptr); + pDir->pDir = taosOpenDir(fulldir); + EXPECT_NE(pDir->pDir, nullptr); + tfsClosedir(pDir); } @@ -744,3 +748,116 @@ TEST_F(TfsTest, 05_MultiDisk) { tfsClose(pTfs); } + +TEST_F(TfsTest, 06_Misc) { + // tfsDisk.c + STfsDisk *pDisk = NULL; + EXPECT_EQ(tfsNewDisk(0, 0, 0, NULL, &pDisk), TSDB_CODE_INVALID_PARA); + EXPECT_NE(tfsNewDisk(0, 0, 0, "", &pDisk), 0); + + STfsDisk disk = {0}; + EXPECT_EQ(tfsUpdateDiskSize(&disk), TSDB_CODE_INVALID_PARA); + + // tfsTier.c + STfsTier tfsTier = {0}; + EXPECT_EQ(taosThreadSpinInit(&tfsTier.lock, 0), 0); + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + + tfsTier.ndisk = 3; + tfsTier.nAvailDisks = 1; + + tfsTier.disks[1] = &disk; + disk.disable = 1; + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + disk.disable = 0; + disk.size.avail = 0; + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + + tfsTier.ndisk = TFS_MAX_DISKS_PER_TIER; + SDiskCfg diskCfg = {0}; + tstrncpy(diskCfg.dir, "testDataDir", TSDB_FILENAME_LEN); + EXPECT_EQ(tfsMountDiskToTier(&tfsTier, &diskCfg, 0), TSDB_CODE_FS_TOO_MANY_MOUNT); + EXPECT_EQ(taosThreadSpinDestroy(&tfsTier.lock), 0); + + // tfs.c + STfs *pTfs = NULL; + EXPECT_EQ(tfsOpen(0, -1, &pTfs), TSDB_CODE_INVALID_PARA); + EXPECT_EQ(tfsOpen(0, 0, &pTfs), TSDB_CODE_INVALID_PARA); + EXPECT_EQ(tfsOpen(0, TFS_MAX_DISKS + 1, &pTfs), TSDB_CODE_INVALID_PARA); + taosMemoryFreeClear(pTfs); + + STfs tfs = {0}; + STfsTier *pTier = &tfs.tiers[0]; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, -1), false); + tfs.nlevel = 2; + pTier->ndisk = 3; + pTier->nAvailDisks = 1; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false); + pTier->disks[0] = &disk; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false); + + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, -1, 0), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, tfs.nlevel + 1, 0), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, -1), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, pTier->ndisk), false); + + EXPECT_EQ(tfsGetDisksAtLevel(&tfs, -1), 0); + EXPECT_EQ(tfsGetDisksAtLevel(&tfs, tfs.nlevel), 0); + + EXPECT_EQ(tfsGetLevel(&tfs), tfs.nlevel); + + for (int32_t l = 0; l < tfs.nlevel; ++l) { + EXPECT_EQ(taosThreadSpinInit(&tfs.tiers[l].lock, 0), 0); + } + + SDiskID diskID = {0}; + disk.size.avail = TFS_MIN_DISK_FREE_SIZE; + EXPECT_EQ(tfsAllocDisk(&tfs, tfs.nlevel, &diskID), 0); + tfs.nlevel = 0; + diskID.level = 0; + EXPECT_EQ(tfsAllocDisk(&tfs, 0, &diskID), 0); + tfs.nlevel = 2; + + diskID.id = 10; + EXPECT_EQ(tfsMkdirAt(&tfs, NULL, diskID), TSDB_CODE_FS_INVLD_CFG); + + EXPECT_NE(tfsMkdirRecurAt(&tfs, NULL, diskID), 0); + + const char *rname = ""; + EXPECT_EQ(tfsRmdir(&tfs, rname), 0); + + EXPECT_EQ(tfsSearch(&tfs, -1, NULL), -1); + EXPECT_EQ(tfsSearch(&tfs, tfs.nlevel, NULL), -1); + + diskCfg.level = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = TFS_MAX_TIERS; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = 0; + diskCfg.primary = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.primary = 2; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.primary = 1; + diskCfg.disable = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.disable = 2; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.disable = 0; + diskCfg.level = 1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = 0; + diskCfg.primary = 0; + tstrncpy(diskCfg.dir, "testDataDir1", TSDB_FILENAME_LEN); + EXPECT_NE(tfsCheckAndFormatCfg(&tfs, &diskCfg), 0); + + TdFilePtr pFile = taosCreateFile("testDataDir1", TD_FILE_CREATE); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + EXPECT_EQ(taosCloseFile(&pFile), 0); + EXPECT_EQ(taosRemoveFile("testDataDir1"), 0); + + for (int32_t l = 0; l < tfs.nlevel; ++l) { + EXPECT_EQ(taosThreadSpinDestroy(&tfs.tiers[l].lock), 0); + } +} diff --git a/source/util/inc/tlogInt.h b/source/util/inc/tlogInt.h new file mode 100644 index 0000000000..1d7f3a063d --- /dev/null +++ b/source/util/inc/tlogInt.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_LOG_INT_H_ +#define _TD_UTIL_LOG_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tlog.h" + +void taosOpenNewSlowLogFile(); +void taosLogObjSetToday(int64_t ts); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_LOG_INT_H_*/ diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a9eef1bfc9..88eccfaffd 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -483,7 +483,7 @@ static int32_t taosOpenNewLogFile() { return 0; } -static void taosOpenNewSlowLogFile() { +void taosOpenNewSlowLogFile() { (void)taosThreadMutexLock(&tsLogObj.logMutex); int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday; if (delta >= 0 && delta < 86400) { @@ -539,6 +539,8 @@ void taosResetLog() { } } +void taosLogObjSetToday(int64_t ts) { tsLogObj.timestampToday = ts; } + static bool taosCheckFileIsOpen(char *logFileName) { TdFilePtr pFile = taosOpenFile(logFileName, TD_FILE_WRITE); if (pFile == NULL) { @@ -619,6 +621,7 @@ static void processLogFileName(const char *logName, int32_t maxFileNum) { } static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) { + int32_t code = 0, lino = 0; #ifdef WINDOWS_STASH /* * always set maxFileNum to 1 @@ -653,38 +656,27 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) { // only an estimate for number of lines int64_t filesize = 0; - if (taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL) != 0) { - (void)printf("\nfailed to fstat log file:%s, reason:%s\n", name, strerror(errno)); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } + TAOS_CHECK_EXIT(taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL)); + tsLogObj.lines = (int32_t)(filesize / 60); if (taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END) < 0) { - TAOS_UNUSED(printf("failed to seek to the end of log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; + TAOS_CHECK_EXIT(terrno); } - (void)snprintf(name, sizeof(name), "==================================================\n"); + (void)snprintf(name, sizeof(name), + "==================================================\n" + " new log file\n" + "==================================================\n"); if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } - (void)snprintf(name, sizeof(name), " new log file \n"); - if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } - (void)snprintf(name, sizeof(name), "==================================================\n"); - if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; + TAOS_CHECK_EXIT(terrno); } +_exit: + taosUnLockLogFile(tsLogObj.logHandle->pFile); + if (code != 0) { + TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code))); + } return 0; } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index cde1392216..ec05a4e415 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -137,6 +137,10 @@ add_test( NAME logTest COMMAND logTest ) +target_include_directories( + logTest + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) IF(COMPILER_SUPPORT_AVX2) MESSAGE(STATUS "AVX2 instructions is ACTIVATED") diff --git a/source/util/test/log.cpp b/source/util/test/log.cpp index ba32d2d639..1899aac2c4 100644 --- a/source/util/test/log.cpp +++ b/source/util/test/log.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include using namespace std; @@ -44,3 +46,96 @@ TEST(log, check_log_refactor) { } taosCloseLog(); } + +extern char *tsLogOutput; +TEST(log, misc) { + // taosInitLog + const char *path = TD_TMP_DIR_PATH "td"; + taosRemoveDir(path); + taosMkDir(path); + tstrncpy(tsLogDir, path, PATH_MAX); + EXPECT_EQ(taosInitLog("taoslog", 1, true), 0); + + taosOpenNewSlowLogFile(); + taosLogObjSetToday(INT64_MIN); + taosPrintSlowLog("slow log test"); + + // test taosInitLogOutput + const char *pLogName = NULL; + tsLogOutput = (char *)taosMemCalloc(1, TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "stdout", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "stderr", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "/dev/null", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tsLogOutput[0] = '#'; + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "/", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "\\", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "testLogOutput", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "testLogOutputDir/testLogOutput", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, ".", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "/..", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tsLogOutput[0] = 0; + + // test taosAssertDebug + tsAssert = false; + taosAssertDebug(true, __FILE__, __LINE__, 0, "test_assert_true_without_core"); + taosAssertDebug(false, __FILE__, __LINE__, 0, "test_assert_false_with_core"); + tsAssert = true; + + // test taosLogCrashInfo, taosReadCrashInfo and taosReleaseCrashLogFile + char nodeType[16] = "nodeType"; + char *pCrashMsg = (char *)taosMemoryCalloc(1, 16); + EXPECT_NE(pCrashMsg, nullptr); + tstrncpy(pCrashMsg, "crashMsg", 16); + +#if !defined(_TD_DARWIN_64) && !defined(WINDOWS) + pid_t pid = taosGetPId(); + EXPECT_EQ(pid > 0, true); + siginfo_t sigInfo = {0}; + sigInfo.si_pid = pid; + taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, &sigInfo); +#else + taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, nullptr); +#endif + + char crashInfo[PATH_MAX] = {0}; + snprintf(crashInfo, sizeof(crashInfo), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType); + + char *pReadMsg = NULL; + int64_t readMsgLen = 0; + TdFilePtr pFile = NULL; + taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile); + EXPECT_NE(pReadMsg, nullptr); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(strncasecmp(pReadMsg, "crashMsg", strlen("crashMsg")), 0); + EXPECT_EQ(taosCloseFile(&pFile), 0); + taosMemoryFreeClear(pReadMsg); + + pFile = taosOpenFile(crashInfo, TD_FILE_WRITE); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(taosWriteFile(pFile, "00000", 1), 1); + EXPECT_EQ(taosCloseFile(&pFile), 0); + + taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile); + EXPECT_EQ(pReadMsg, nullptr); + EXPECT_EQ(pFile, nullptr); + + pFile = taosOpenFile(crashInfo, TD_FILE_WRITE); + EXPECT_NE(pFile, nullptr); + taosReleaseCrashLogFile(pFile, true); + + // clean up + taosRemoveDir(path); + + taosCloseLog(); +}