diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 35ab328815..68d75f9897 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -66,9 +66,9 @@ int32_t s3Begin() { void s3End() { S3_deinitialize(); } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } - +#if 0 static int32_t s3ListBucket(char const *bucketname); - +#endif static void s3DumpCfgByEp(int8_t epIndex) { // clang-format off (void)fprintf(stdout, @@ -291,7 +291,7 @@ static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) { TAOS_RETURN(code); } - +#if 0 static int32_t s3ListBucket(char const *bucketname) { int32_t code = 0; @@ -312,7 +312,7 @@ static int32_t s3ListBucket(char const *bucketname) { TAOS_RETURN(code); } - +#endif typedef struct growbuffer { // The total number of bytes, and the start byte int size; diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index bb12612273..31afb7377e 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -40,6 +40,46 @@ add_test( COMMAND dataformatTest ) +# cosCpTest.cpp +add_executable(cosCpTest "") +target_sources( + cosCpTest + PRIVATE + "cosCpTest.cpp" +) +target_link_libraries(cosCpTest gtest gtest_main util common) +target_include_directories( + cosCpTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosCpTest + COMMAND cosCpTest +) + +if(TD_LINUX) + +# cosTest.cpp +add_executable(cosTest "") +target_sources( + cosTest + PRIVATE + "cosTest.cpp" +) +target_link_libraries(cosTest gtest gtest_main util common) +target_include_directories( + cosTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosTest + COMMAND cosTest +) + +endif() + if (${TD_LINUX}) # tmsg test add_executable(tmsgTest "") @@ -60,4 +100,4 @@ if (${TD_LINUX}) add_custom_command(TARGET tmsgTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${MSG_TBL_FILE} $ ) -endif () \ No newline at end of file +endif () diff --git a/source/common/test/cosCpTest.cpp b/source/common/test/cosCpTest.cpp new file mode 100644 index 0000000000..fc16daa8cc --- /dev/null +++ b/source/common/test/cosCpTest.cpp @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +TEST(testCase, cpOpenCloseRemove) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuild) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoad) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 0); + EXPECT_EQ(cp.parts[0].etag, std::string("")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuildUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + int seq = 1; + char *etags[1] = {"etags-1-xxx"}; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + cos_cp_update(&cp, cp.parts[seq - 1].index, etags[seq - 1], 0); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoadUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 1); + EXPECT_EQ(cp.parts[0].etag, std::string("etags-1-xxx")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} diff --git a/source/common/test/cosTest.cpp b/source/common/test/cosTest.cpp new file mode 100644 index 0000000000..5a6aee52d9 --- /dev/null +++ b/source/common/test/cosTest.cpp @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +int32_t cosInitEnv() { + int32_t code = 0; + bool isBlob = false; + + extern int8_t tsS3Ablob; + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + extern char tsS3BucketName[TSDB_FQDN_LEN]; + + tsS3Ablob = isBlob; + /* + const char *hostname = "endpoint/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + */ + + // const char *hostname = "http://192.168.1.52:9000"; + // const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + // const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + // const char *bucketName = "test-bucket"; + const char *hostname = "192.168.1.52:9000"; + const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + const char *bucketName = "ci-bucket19"; + + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + + // setup s3 env + extern int8_t tsS3EpNum; + extern int8_t tsS3Https[TSDB_MAX_EP_NUM]; + + tsS3EpNum = 1; + tsS3Https[0] = false; + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +TEST(testCase, cosCpPutError) { + int32_t code = 0, lino = 0; + + char const *objectName = "testObject"; + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), -1); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3EvictCache("", 0); + + s3End(); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cosCpPut) { + int32_t code = 0, lino = 0; + + int8_t with_cp = 0; + char *data = nullptr; + + const long objectSize = 65 * 1024 * 1024; + char const *objectName = "cosut.bin"; + const char object_name[] = "cosut.bin"; + + EXPECT_EQ(std::string(object_name), objectName); + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + + { + data = (char *)taosMemoryCalloc(1, objectSize); + if (!data) { + TAOS_CHECK_EXIT(terrno); + } + + for (int i = 0; i < objectSize / 2; ++i) { + data[i * 2 + 1] = 1; + } + + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, objectSize); + GTEST_ASSERT_EQ(n, objectSize); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + + with_cp = 1; + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), objectSize); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3End(); + s3EvictCache("", 0); + + taosMemoryFree(data); + + EXPECT_EQ(taosRemoveFile(path), TSDB_CODE_SUCCESS); + } + + return; + +_exit: + if (data) { + taosMemoryFree(data); + s3End(); + } + + std::cout << "code: " << code << std::endl; +} diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp index 33ac774d0c..ba2ac14551 100644 --- a/source/libs/azure/src/td_block_blob_client.cpp +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -38,6 +38,8 @@ TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::strin return newClient; } +TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} +#if 0 TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, const BlobClientOptions& options) : BlobClient(blobUrl, std::move(credential), options) {} @@ -50,8 +52,6 @@ TDBlockBlobClient::TDBlockBlobClient(const std::string& TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options) : BlobClient(blobUrl, options) {} -TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} - TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const { TDBlockBlobClient newClient(*this); if (snapshot.empty()) { @@ -74,47 +74,6 @@ TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId) return newClient; } -Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, - const UploadBlockBlobOptions& options, - const Azure::Core::Context& context) const { - _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; - if (options.TransactionalContentHash.HasValue()) { - if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { - protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; - } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { - protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; - } - } - protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; - protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; - protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; - protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; - protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; - protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; - protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); - protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); - protocolLayerOptions.Tier = options.AccessTier; - protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; - protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; - protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; - protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; - protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; - protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; - if (m_customerProvidedKey.HasValue()) { - protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; - protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; - protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); - } - protocolLayerOptions.EncryptionScope = m_encryptionScope; - if (options.ImmutabilityPolicy.HasValue()) { - protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; - protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; - } - protocolLayerOptions.LegalHold = options.HasLegalHold; - - return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); -} - Azure::Response TDBlockBlobClient::UploadFrom( const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options, const Azure::Core::Context& context) const { @@ -270,6 +229,47 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } +#endif +Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, + const UploadBlockBlobOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (options.ImmutabilityPolicy.HasValue()) { + protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; + protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; + } + protocolLayerOptions.LegalHold = options.HasLegalHold; + + return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); +} Azure::Response TDBlockBlobClient::UploadFrom( const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options, @@ -349,7 +349,7 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } - +#if 0 Azure::Response TDBlockBlobClient::UploadFromUri( const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options, const Azure::Core::Context& context) const { @@ -396,7 +396,7 @@ Azure::Response TDBlockBlobClient::UploadF return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::StageBlock(const std::string& blockId, Azure::Core::IO::BodyStream& content, const StageBlockOptions& options, @@ -419,7 +419,7 @@ Azure::Response TDBlockBlobClient::StageBlock(const st protocolLayerOptions.EncryptionScope = m_encryptionScope; return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::StageBlockFromUri( const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options, const Azure::Core::Context& context) const { @@ -457,7 +457,7 @@ Azure::Response TDBlockBlobClient::StageBlockFr return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::CommitBlockList( const std::vector& blockIds, const CommitBlockListOptions& options, const Azure::Core::Context& context) const { @@ -492,7 +492,7 @@ Azure::Response TDBlockBlobClient::CommitBlockLis return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options, const Azure::Core::Context& context) const { _detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions; @@ -502,6 +502,7 @@ Azure::Response TDBlockBlobClient::GetBlockList(cons return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, _internal::WithReplicaStatus(context)); } +#endif /* Azure::Response TDBlockBlobClient::Query(const std::string& querySqlExpression, const QueryBlobOptions& options, diff --git a/source/libs/azure/test/azExceptionTest.cpp b/source/libs/azure/test/azExceptionTest.cpp new file mode 100644 index 0000000000..a83bb4d8f2 --- /dev/null +++ b/source/libs/azure/test/azExceptionTest.cpp @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +// clang-format off +#include "td_block_blob_client.hpp" +#include "az.h" +// clang-format on + +using namespace Azure::Storage; +using namespace Azure::Storage::Blobs; + +extern int8_t tsS3Enabled; +extern char tsS3BucketName[TSDB_FQDN_LEN]; + +static int32_t azInitEnv() { + int32_t code = 0; + + extern int8_t tsS3EpNum; + + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + + /* TCS parameter format + tsS3Hostname[0] = "/.blob.core.windows.net"; + tsS3AccessKeyId[0] = ""; + tsS3AccessKeySecret[0] = ""; + tsS3BucketName = ""; + */ + + const char *hostname = "/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + + if (hostname[0] != '<') { + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + } else { + const char *accountId = getenv("ablob_account_id"); + if (!accountId) { + return -1; + } + + const char *accountSecret = getenv("ablob_account_secret"); + if (!accountSecret) { + return -1; + } + + const char *containerName = getenv("ablob_container"); + if (!containerName) { + return -1; + } + + TAOS_STRCPY(&tsS3Hostname[0][0], accountId); + TAOS_STRCAT(&tsS3Hostname[0][0], ".blob.core.windows.net"); + TAOS_STRCPY(&tsS3AccessKeyId[0][0], accountId); + TAOS_STRCPY(&tsS3AccessKeySecret[0][0], accountSecret); + TAOS_STRCPY(tsS3BucketName, containerName); + } + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +// TEST(AzTest, DISABLED_InterfaceTest) { +TEST(AzETest, InterfaceTest) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 4096; + char data[size] = {0}; + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + TDBlockBlobClient blobClient = + TDBlockBlobClient::CreateFromConnectionString(std::getenv("ablob_cs"), std::string(tsS3BucketName), object_name); + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + azEnd(); +} diff --git a/source/libs/azure/test/azTest.cpp b/source/libs/azure/test/azTest.cpp index 0459cb5f6a..b57b7ca884 100644 --- a/source/libs/azure/test/azTest.cpp +++ b/source/libs/azure/test/azTest.cpp @@ -199,3 +199,132 @@ TEST(AzTest, InterfaceTest) { azEnd(); } + +// TEST(AzTest, DISABLED_InterfaceTestBig) { +TEST(AzTest, InterfaceTestBig) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 256 * 1024 * 1024 + 1; + char *data = (char *)taosMemoryCalloc(1, size); + if (!data) { + std::cout << "code: " << code << "terrno: " << terrno << std::endl; + + return; + } + + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + taosMemoryFree(data); + + azEnd(); +} diff --git a/source/libs/tdb/src/db/tdbUtil.c b/source/libs/tdb/src/db/tdbUtil.c index 2249123ef5..14845ed8ee 100644 --- a/source/libs/tdb/src/db/tdbUtil.c +++ b/source/libs/tdb/src/db/tdbUtil.c @@ -66,3 +66,10 @@ int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) { *size = szBytes / szPage; return 0; } + +void tdbCloseDir(TdDirPtr *ppDir) { + int32_t ret = taosCloseDir(ppDir); + if (ret) { + tdbError("failed to close directory, reason:%s", tstrerror(ret)); + } +} diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 993c3ffab8..cd2d4ce57a 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -71,12 +71,7 @@ typedef TdFilePtr tdb_fd_t; #define tdbGetDirEntryName taosGetDirEntryName #define tdbDirEntryBaseName taosDirEntryBaseName -static FORCE_INLINE void tdbCloseDir(TdDirPtr *ppDir) { - int32_t ret = taosCloseDir(ppDir); - if (ret) { - tdbError("failed to close directory, reason:%s", tstrerror(ret)); - } -} +void tdbCloseDir(TdDirPtr *ppDir); #define tdbOsRemove remove #define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL)