diff --git a/include/libs/azure/az.h b/include/libs/azure/az.h new file mode 100644 index 0000000000..55839b0727 --- /dev/null +++ b/include/libs/azure/az.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_AZURE_H_ +#define _TD_AZURE_H_ + +#include "os.h" +#include "tarray.h" +#include "tdef.h" +#include "tlog.h" +#include "tmsg.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t azBegin(); +void azEnd(); +int32_t azCheckCfg(); +int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size); +int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock); +void azDeleteObjectsByPrefix(const char *prefix); + +int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp); +int32_t azGetObjectsByPrefix(const char *prefix, const char *path); +int32_t azGetObjectToFile(const char *object_name, const char *fileName); +int32_t azDeleteObjects(const char *object_name[], int nobject); + +#ifdef __cplusplus +} +#endif + +#endif // _TD_AZURE_H_ diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index 41a1e99521..033582f2c0 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -23,5 +23,5 @@ add_subdirectory(planner) add_subdirectory(qworker) add_subdirectory(geometry) add_subdirectory(command) -#add_subdirectory(azure) +add_subdirectory(azure) add_subdirectory(tcs) diff --git a/source/libs/azure/CMakeLists.txt b/source/libs/azure/CMakeLists.txt new file mode 100644 index 0000000000..1d46a2924b --- /dev/null +++ b/source/libs/azure/CMakeLists.txt @@ -0,0 +1,32 @@ +#if(${TD_LINUX}) +aux_source_directory(src AZ_SRC) + +add_library(az STATIC ${AZ_SRC}) + +if(${BUILD_S3}) + add_definitions(-DUSE_S3) + target_link_libraries( + az + PUBLIC _azure_sdk + PUBLIC crypt + ) +endif() + +target_include_directories( + az + PUBLIC "${TD_SOURCE_DIR}/include/libs/azure" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries( + az + PUBLIC cjson + PUBLIC os + PUBLIC util + PUBLIC common +) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) +#endif(${TD_LINUX}) diff --git a/source/libs/azure/inc/td_block_blob_client.hpp b/source/libs/azure/inc/td_block_blob_client.hpp new file mode 100644 index 0000000000..1b00104821 --- /dev/null +++ b/source/libs/azure/inc/td_block_blob_client.hpp @@ -0,0 +1,260 @@ +#pragma once + +#include "azure/storage/blobs/blob_client.hpp" + +#include +#include +#include + +namespace Azure { +namespace Storage { +namespace Files { +namespace DataLake { +class FileClient; +} +} // namespace Files +} // namespace Storage +} // namespace Azure + +namespace Azure { +namespace Storage { +namespace Blobs { + +/** + * @brief The TDBlockBlobClient allows you to manipulate Azure Storage block blobs. + * + * Block blobs let you upload large blobs efficiently. Block blobs are comprised of blocks, each + * of which is identified by a block ID. You create or modify a block blob by writing a set of + * blocks and committing them by their block IDs. Each block can be a different size. + * + * When you upload a block to a blob in your storage account, it is associated with the specified + * block blob, but it does not become part of the blob until you commit a list of blocks that + * includes the new block's ID. New blocks remain in an uncommitted state until they are + * specifically committed or discarded. Writing a block does not update the last modified time of + * an existing blob. + */ +class TDBlockBlobClient final : public BlobClient { + public: + /** + * @brief Initialize a new instance of TDBlockBlobClient. + * + * @param connectionString A connection string includes the authentication information required + * for your application to access data in an Azure Storage account at runtime. + * @param blobContainerName The name of the container containing this blob. + * @param blobName The name of this blob. + * @param options Optional client options that define the transport pipeline policies for + * authentication, retries, etc., that are applied to every request. + * @return A new TDBlockBlobClient instance. + */ + static TDBlockBlobClient CreateFromConnectionString(const std::string& connectionString, + const std::string& blobContainerName, const std::string& blobName, + const BlobClientOptions& options = BlobClientOptions()); + + /** + * @brief Initialize a new instance of TDBlockBlobClient. + * + * @param blobUrl A URL + * referencing the blob that includes the name of the account, the name of the container, and + * the name of the blob. + * @param credential The shared key credential used to sign + * requests. + * @param options Optional client options that define the transport pipeline + * policies for authentication, retries, etc., that are applied to every request. + */ + explicit TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, + const BlobClientOptions& options = BlobClientOptions()); + + /** + * @brief Initialize a new instance of TDBlockBlobClient. + * + * @param blobUrl A URL + * referencing the blob that includes the name of the account, the name of the container, and + * the name of the blob. + * @param credential The token credential used to sign requests. + * @param options Optional client options that define the transport pipeline policies for + * authentication, retries, etc., that are applied to every request. + */ + explicit TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, + const BlobClientOptions& options = BlobClientOptions()); + + /** + * @brief Initialize a new instance of TDBlockBlobClient. + * + * @param blobUrl A URL + * referencing the blob that includes the name of the account, the name of the container, and + * the name of the blob, and possibly also a SAS token. + * @param options Optional client + * options that define the transport pipeline policies for authentication, retries, etc., that + * are applied to every request. + */ + explicit TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options = BlobClientOptions()); + + /** + * @brief Initializes a new instance of the TDBlockBlobClient class with an identical URL + * source but the specified snapshot timestamp. + * + * @param snapshot The snapshot + * identifier. + * @return A new TDBlockBlobClient instance. + * @remarks Pass empty string to remove the snapshot returning the base blob. + */ + TDBlockBlobClient WithSnapshot(const std::string& snapshot) const; + + /** + * @brief Creates a clone of this instance that references a version ID rather than the base + * blob. + * + * @param versionId The version ID returning a URL to the base blob. + * @return A new TDBlockBlobClient instance. + * @remarks Pass empty string to remove the version ID returning the base blob. + */ + TDBlockBlobClient WithVersionId(const std::string& versionId) const; + + /** + * @brief Creates a new block blob, or updates the content of an existing block blob. Updating + * an existing block blob overwrites any existing metadata on the blob. + * + * @param content A BodyStream containing the content to upload. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A UploadBlockBlobResult describing the state of the updated block blob. + */ + Azure::Response Upload( + Azure::Core::IO::BodyStream& content, const UploadBlockBlobOptions& options = UploadBlockBlobOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Creates a new block blob, or updates the content of an existing block blob. Updating + * an existing block blob overwrites any existing metadata on the blob. + * + * @param buffer A memory buffer containing the content to upload. + * @param bufferSize Size of the memory buffer. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A UploadBlockBlobFromResult describing the state of the updated block blob. + */ + Azure::Response UploadFrom( + const uint8_t* buffer, size_t bufferSize, + const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Creates a new block blob, or updates the content of an existing block blob. Updating + * an existing block blob overwrites any existing metadata on the blob. + * + * @param fileName A file containing the content to upload. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A UploadBlockBlobFromResult describing the state of the updated block blob. + */ + Azure::Response UploadFrom( + const std::string& fileName, const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + Azure::Response UploadFrom( + const std::string& fileName, int64_t offset, int64_t size, + const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Creates a new Block Blob where the contents of the blob are read from a given URL. + * + * @param sourceUri Specifies the URL of the source blob. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A UploadBlockBlobFromUriResult describing the state of the updated block blob. + */ + Azure::Response UploadFromUri( + const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options = UploadBlockBlobFromUriOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Creates a new block as part of a block blob's staging area to be eventually + * committed via the CommitBlockList operation. + * + * @param blockId A valid Base64 string value that identifies the block. Prior to encoding, the + * string must be less than or equal to 64 bytes in size. + * @param content A BodyStream containing the content to upload. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A StageBlockResult describing the state of the updated block. + */ + Azure::Response StageBlock( + const std::string& blockId, Azure::Core::IO::BodyStream& content, + const StageBlockOptions& options = StageBlockOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Creates a new block to be committed as part of a blob where the contents are read from + * the sourceUri. + * + * @param blockId A valid Base64 string value that identifies the block. Prior to encoding, the + * string must be less than or equal to 64 bytes in size. + * @param sourceUri Specifies the uri of the source + * blob. The value may be a uri of up to 2 KB in length that specifies a blob. The source blob + * must either be public or must be authenticated via a shared access signature. If the source + * blob is public, no authentication is required to perform the operation. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A StageBlockFromUriResult describing the state of the updated block blob. + */ + Azure::Response StageBlockFromUri( + const std::string& blockId, const std::string& sourceUri, + const StageBlockFromUriOptions& options = StageBlockFromUriOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Writes a blob by specifying the list of block IDs that make up the blob. In order to + * be written as part of a blob, a block must have been successfully written to the server in a + * prior StageBlock operation. You can call CommitBlockList to update a blob by uploading only + * those blocks that have changed, then committing the new and existing blocks together. You can + * do this by specifying whether to commit a block from the committed block list or from the + * uncommitted block list, or to commit the most recently uploaded version of the block, + * whichever list it may belong to. + * + * @param blockIds Base64 encoded block IDs to indicate that make up the blob. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A CommitBlobBlockListResult describing the state of the updated block blob. + */ + Azure::Response CommitBlockList( + const std::vector& blockIds, const CommitBlockListOptions& options = CommitBlockListOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Retrieves the list of blocks that have been uploaded as part of a block blob. There + * are two block lists maintained for a blob. The Committed Block list has blocks that have been + * successfully committed to a given blob with CommitBlockList. The Uncommitted Block list has + * blocks that have been uploaded for a blob using StageBlock, but that have not yet been + * committed. + * + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A GetBlobBlockListResult describing requested block list. + */ + Azure::Response GetBlockList( + const GetBlockListOptions& options = GetBlockListOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + /** + * @brief Returns the result of a query against the blob. + * + * @param querySqlExpression The query expression in SQL. + * @param options Optional parameters to execute this function. + * @param context Context for cancelling long running operations. + * @return A QueryBlobResult describing the query result. + */ + Azure::Response Query(const std::string& querySqlExpression, + const QueryBlobOptions& options = QueryBlobOptions(), + const Azure::Core::Context& context = Azure::Core::Context()) const; + + explicit TDBlockBlobClient(BlobClient blobClient); + + private: + friend class BlobClient; + friend class Files::DataLake::DataLakeFileClient; +}; + +} // namespace Blobs +} // namespace Storage +} // namespace Azure diff --git a/source/libs/azure/src/avro_parser.cpp b/source/libs/azure/src/avro_parser.cpp new file mode 100644 index 0000000000..485980e007 --- /dev/null +++ b/source/libs/azure/src/avro_parser.cpp @@ -0,0 +1,531 @@ +#if defined(USE_S3) +#include "avro_parser.hpp" + +#include +#include + +#include +#include + +namespace Azure { +namespace Storage { +namespace Blobs { +namespace _detail { + +namespace { +int64_t parseInt(AvroStreamReader::ReaderPos& data) { + uint64_t r = 0; + int nb = 0; + while (true) { + uint8_t c = (*data.BufferPtr)[data.Offset++]; + r = r | ((static_cast(c) & 0x7f) << (nb * 7)); + if (c & 0x80) { + ++nb; + continue; + } + break; + } + return static_cast(r >> 1) ^ -static_cast(r & 0x01); +} + +AvroSchema ParseSchemaFromJsonString(const std::string& jsonSchema) { + const static std::map BuiltinNameSchemaMap = { + {"string", AvroSchema::StringSchema}, {"bytes", AvroSchema::BytesSchema}, {"int", AvroSchema::IntSchema}, + {"long", AvroSchema::LongSchema}, {"float", AvroSchema::FloatSchema}, {"double", AvroSchema::DoubleSchema}, + {"boolean", AvroSchema::BoolSchema}, {"null", AvroSchema::NullSchema}, {"string", AvroSchema::StringSchema}, + }; + std::map nameSchemaMap = BuiltinNameSchemaMap; + + std::function parseSchemaFromJsonObject; + parseSchemaFromJsonObject = [&](const Core::Json::_internal::json& obj) -> AvroSchema { + if (obj.is_string()) { + auto typeName = obj.get(); + return nameSchemaMap.find(typeName)->second; + } else if (obj.is_array()) { + std::vector unionSchemas; + for (const auto& s : obj) { + unionSchemas.push_back(parseSchemaFromJsonObject(s)); + } + return AvroSchema::UnionSchema(std::move(unionSchemas)); + } else if (obj.is_object()) { + if (obj.count("namespace") != 0) { + throw std::runtime_error("Namespace isn't supported yet in Avro schema."); + } + if (obj.count("aliases") != 0) { + throw std::runtime_error("Alias isn't supported yet in Avro schema."); + } + auto typeName = obj["type"].get(); + auto i = nameSchemaMap.find(typeName); + if (i != nameSchemaMap.end()) { + return i->second; + } + if (typeName == "record") { + std::vector> fieldsSchema; + for (const auto& field : obj["fields"]) { + fieldsSchema.push_back( + std::make_pair(field["name"].get(), parseSchemaFromJsonObject(field["type"]))); + } + + const std::string recordName = obj["name"].get(); + auto recordSchema = AvroSchema::RecordSchema(recordName, std::move(fieldsSchema)); + nameSchemaMap.insert(std::make_pair(recordName, recordSchema)); + return recordSchema; + } else if (typeName == "enum") { + throw std::runtime_error("Enum type isn't supported yet in Avro schema."); + } else if (typeName == "array") { + return AvroSchema::ArraySchema(parseSchemaFromJsonObject(obj["items"])); + } else if (typeName == "map") { + return AvroSchema::MapSchema(parseSchemaFromJsonObject(obj["items"])); + } else if (typeName == "fixed") { + const std::string fixedName = obj["name"].get(); + auto fixedSchema = AvroSchema::FixedSchema(fixedName, obj["size"].get()); + nameSchemaMap.insert(std::make_pair(fixedName, fixedSchema)); + return fixedSchema; + } else { + throw std::runtime_error("Unrecognized type " + typeName + " in Avro schema."); + } + } + AZURE_UNREACHABLE_CODE(); + }; + + auto jsonRoot = Core::Json::_internal::json::parse(jsonSchema.begin(), jsonSchema.end()); + return parseSchemaFromJsonObject(jsonRoot); +} +} // namespace + +int64_t AvroStreamReader::ParseInt(const Core::Context& context) { + uint64_t r = 0; + int nb = 0; + while (true) { + Preload(1, context); + uint8_t c = m_streambuffer[m_pos.Offset++]; + + r = r | ((static_cast(c) & 0x7f) << (nb * 7)); + if (c & 0x80) { + ++nb; + continue; + } + break; + } + return static_cast(r >> 1) ^ -static_cast(r & 0x01); +} + +void AvroStreamReader::Advance(size_t n, const Core::Context& context) { + Preload(n, context); + m_pos.Offset += n; +} + +size_t AvroStreamReader::Preload(size_t n, const Core::Context& context) { + size_t oldAvailable = AvailableBytes(); + while (true) { + size_t newAvailable = TryPreload(n, context); + if (newAvailable >= n) { + return newAvailable; + } + if (oldAvailable == newAvailable) { + throw std::runtime_error("Unexpected EOF of Avro stream."); + } + oldAvailable = newAvailable; + } + AZURE_UNREACHABLE_CODE(); +} + +size_t AvroStreamReader::TryPreload(size_t n, const Core::Context& context) { + size_t availableBytes = AvailableBytes(); + if (availableBytes >= n) { + return availableBytes; + } + const size_t MinRead = 4096; + size_t tryReadSize = (std::max)(n, MinRead); + size_t currSize = m_streambuffer.size(); + m_streambuffer.resize(m_streambuffer.size() + tryReadSize); + size_t actualReadSize = m_stream->Read(m_streambuffer.data() + currSize, tryReadSize, context); + m_streambuffer.resize(currSize + actualReadSize); + return AvailableBytes(); +} + +void AvroStreamReader::Discard() { + constexpr size_t MinimumReleaseMemory = 128 * 1024; + if (m_pos.Offset < MinimumReleaseMemory) { + return; + } + const size_t availableBytes = AvailableBytes(); + std::memmove(&m_streambuffer[0], &m_streambuffer[m_pos.Offset], availableBytes); + m_streambuffer.resize(availableBytes); + m_pos.Offset = 0; +} + +const AvroSchema AvroSchema::StringSchema(AvroDatumType::String); +const AvroSchema AvroSchema::BytesSchema(AvroDatumType::Bytes); +const AvroSchema AvroSchema::IntSchema(AvroDatumType::Int); +const AvroSchema AvroSchema::LongSchema(AvroDatumType::Long); +const AvroSchema AvroSchema::FloatSchema(AvroDatumType::Float); +const AvroSchema AvroSchema::DoubleSchema(AvroDatumType::Double); +const AvroSchema AvroSchema::BoolSchema(AvroDatumType::Bool); +const AvroSchema AvroSchema::NullSchema(AvroDatumType::Null); + +AvroSchema AvroSchema::RecordSchema(std::string name, + const std::vector>& fieldsSchema) { + AvroSchema recordSchema(AvroDatumType::Record); + recordSchema.m_name = std::move(name); + recordSchema.m_status = std::make_shared(); + for (auto& i : fieldsSchema) { + recordSchema.m_status->m_keys.push_back(i.first); + recordSchema.m_status->m_schemas.push_back(i.second); + } + return recordSchema; +} + +AvroSchema AvroSchema::ArraySchema(AvroSchema elementSchema) { + AvroSchema arraySchema(AvroDatumType::Array); + arraySchema.m_status = std::make_shared(); + arraySchema.m_status->m_schemas.push_back(std::move(elementSchema)); + return arraySchema; +} + +AvroSchema AvroSchema::MapSchema(AvroSchema elementSchema) { + AvroSchema mapSchema(AvroDatumType::Map); + mapSchema.m_status = std::make_shared(); + mapSchema.m_status->m_schemas.push_back(std::move(elementSchema)); + return mapSchema; +} + +AvroSchema AvroSchema::UnionSchema(std::vector schemas) { + AvroSchema unionSchema(AvroDatumType::Union); + unionSchema.m_status = std::make_shared(); + unionSchema.m_status->m_schemas = std::move(schemas); + return unionSchema; +} + +AvroSchema AvroSchema::FixedSchema(std::string name, int64_t size) { + AvroSchema fixedSchema(AvroDatumType::Fixed); + fixedSchema.m_name = std::move(name); + fixedSchema.m_status = std::make_shared(); + fixedSchema.m_status->m_size = size; + return fixedSchema; +} + +void AvroDatum::Fill(AvroStreamReader& reader, const Core::Context& context) { + m_data = reader.m_pos; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + int64_t stringSize = reader.ParseInt(context); + reader.Advance(static_cast(stringSize), context); + } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || + m_schema.Type() == AvroDatumType::Enum) { + reader.ParseInt(context); + } else if (m_schema.Type() == AvroDatumType::Float) { + reader.Advance(4, context); + } else if (m_schema.Type() == AvroDatumType::Double) { + reader.Advance(8, context); + } else if (m_schema.Type() == AvroDatumType::Bool) { + reader.Advance(1, context); + } else if (m_schema.Type() == AvroDatumType::Null) { + reader.Advance(0, context); + } else if (m_schema.Type() == AvroDatumType::Record) { + for (const auto& s : m_schema.FieldSchemas()) { + AvroDatum(s).Fill(reader, context); + } + } else if (m_schema.Type() == AvroDatumType::Array) { + while (true) { + int64_t numElementsInBlock = reader.ParseInt(context); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = reader.ParseInt(context); + reader.Advance(static_cast(blockSize), context); + } else { + for (auto i = 0; i < numElementsInBlock; ++i) { + AvroDatum(m_schema.ItemSchema()).Fill(reader, context); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Map) { + while (true) { + int64_t numElementsInBlock = reader.ParseInt(context); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = reader.ParseInt(context); + reader.Advance(static_cast(blockSize), context); + } else { + for (int64_t i = 0; i < numElementsInBlock; ++i) { + AvroDatum(AvroSchema::StringSchema).Fill(reader, context); + AvroDatum(m_schema.ItemSchema()).Fill(reader, context); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = reader.ParseInt(context); + AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(reader, context); + } else if (m_schema.Type() == AvroDatumType::Fixed) { + reader.Advance(m_schema.Size(), context); + } else { + AZURE_UNREACHABLE_CODE(); + } +} + +void AvroDatum::Fill(AvroStreamReader::ReaderPos& data) { + m_data = data; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + int64_t stringSize = parseInt(data); + data.Offset += static_cast(stringSize); + } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || + m_schema.Type() == AvroDatumType::Enum) { + parseInt(data); + } else if (m_schema.Type() == AvroDatumType::Float) { + data.Offset += 4; + } else if (m_schema.Type() == AvroDatumType::Double) { + data.Offset += 8; + } else if (m_schema.Type() == AvroDatumType::Bool) { + data.Offset += 1; + } else if (m_schema.Type() == AvroDatumType::Null) { + data.Offset += 0; + } else if (m_schema.Type() == AvroDatumType::Record) { + for (const auto& s : m_schema.FieldSchemas()) { + AvroDatum(s).Fill(data); + } + } else if (m_schema.Type() == AvroDatumType::Array) { + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = parseInt(data); + data.Offset += static_cast(blockSize); + } else { + for (auto i = 0; i < numElementsInBlock; ++i) { + AvroDatum(m_schema.ItemSchema()).Fill(data); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Map) { + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = parseInt(data); + data.Offset += static_cast(blockSize); + } else { + for (int64_t i = 0; i < numElementsInBlock; ++i) { + AvroDatum(AvroSchema::StringSchema).Fill(data); + AvroDatum(m_schema.ItemSchema()).Fill(data); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = parseInt(data); + AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(data); + } else if (m_schema.Type() == AvroDatumType::Fixed) { + data.Offset += m_schema.Size(); + } else { + AZURE_UNREACHABLE_CODE(); + } +} + +template <> +AvroDatum::StringView AvroDatum::Value() const { + auto data = m_data; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + const int64_t length = parseInt(data); + const uint8_t* start = &(*data.BufferPtr)[data.Offset]; + StringView ret{start, static_cast(length)}; + data.Offset += static_cast(length); + return ret; + } + if (m_schema.Type() == AvroDatumType::Fixed) { + const size_t fixedSize = m_schema.Size(); + const uint8_t* start = &(*data.BufferPtr)[data.Offset]; + StringView ret{start, fixedSize}; + data.Offset += fixedSize; + return ret; + } + AZURE_UNREACHABLE_CODE(); +} + +template <> +std::string AvroDatum::Value() const { + auto stringView = Value(); + return std::string(stringView.Data, stringView.Data + stringView.Length); +} + +template <> +std::vector AvroDatum::Value() const { + auto stringView = Value(); + return std::vector(stringView.Data, stringView.Data + stringView.Length); +} + +template <> +int64_t AvroDatum::Value() const { + auto data = m_data; + return parseInt(data); +} + +template <> +int32_t AvroDatum::Value() const { + return static_cast(Value()); +} + +template <> +bool AvroDatum::Value() const { + return Value(); +} + +template <> +std::nullptr_t AvroDatum::Value() const { + return nullptr; +} + +template <> +AvroRecord AvroDatum::Value() const { + auto data = m_data; + + AvroRecord r; + r.m_keys = &m_schema.FieldNames(); + for (const auto& schema : m_schema.FieldSchemas()) { + auto datum = AvroDatum(schema); + datum.Fill(data); + r.m_values.push_back(std::move(datum)); + } + + return r; +} + +template <> +AvroMap AvroDatum::Value() const { + auto data = m_data; + + AvroMap m; + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } + if (numElementsInBlock < 0) { + numElementsInBlock = -numElementsInBlock; + parseInt(data); + } + for (int64_t i = 0; i < numElementsInBlock; ++i) { + auto keyDatum = AvroDatum(AvroSchema::StringSchema); + keyDatum.Fill(data); + auto valueDatum = AvroDatum(m_schema.ItemSchema()); + valueDatum.Fill(data); + m[keyDatum.Value()] = valueDatum; + } + } + return m; +} + +template <> +AvroDatum AvroDatum::Value() const { + auto data = m_data; + if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = parseInt(data); + auto datum = AvroDatum(m_schema.FieldSchemas()[static_cast(i)]); + datum.Fill(data); + return datum; + } + AZURE_UNREACHABLE_CODE(); +} + +AvroObjectContainerReader::AvroObjectContainerReader(Core::IO::BodyStream& stream) + : m_reader(std::make_unique(stream)) {} + +AvroDatum AvroObjectContainerReader::NextImpl(const AvroSchema* schema, const Core::Context& context) { + AZURE_ASSERT_FALSE(m_eof); + static const auto SyncMarkerSchema = AvroSchema::FixedSchema("Sync", 16); + if (!schema) { + static AvroSchema FileHeaderSchema = []() { + std::vector> fieldsSchema; + fieldsSchema.push_back(std::make_pair("magic", AvroSchema::FixedSchema("Magic", 4))); + fieldsSchema.push_back(std::make_pair("meta", AvroSchema::MapSchema(AvroSchema::BytesSchema))); + fieldsSchema.push_back(std::make_pair("sync", SyncMarkerSchema)); + return AvroSchema::RecordSchema("org.apache.avro.file.Header", std::move(fieldsSchema)); + }(); + auto fileHeaderDatum = AvroDatum(FileHeaderSchema); + fileHeaderDatum.Fill(*m_reader, context); + auto fileHeader = fileHeaderDatum.Value(); + if (fileHeader.Field("magic").Value() != "Obj\01") { + throw std::runtime_error("Invalid Avro object container magic."); + } + AvroMap meta = fileHeader.Field("meta").Value(); + std::string objectSchemaJson = meta["avro.schema"].Value(); + std::string codec = "null"; + if (meta.count("avro.codec") != 0) { + codec = meta["avro.codec"].Value(); + } + if (codec != "null") { + throw std::runtime_error("Unsupported Avro codec: " + codec); + } + m_syncMarker = fileHeader.Field("sync").Value(); + m_objectSchema = std::make_unique(ParseSchemaFromJsonString(objectSchemaJson)); + schema = m_objectSchema.get(); + } + + if (m_remainingObjectInCurrentBlock == 0) { + m_reader->Discard(); + m_remainingObjectInCurrentBlock = m_reader->ParseInt(context); + int64_t ObjectsSize = m_reader->ParseInt(context); + m_reader->Preload(static_cast(ObjectsSize), context); + } + + auto objectDatum = AvroDatum(*m_objectSchema); + objectDatum.Fill(*m_reader, context); + if (--m_remainingObjectInCurrentBlock == 0) { + auto markerDatum = AvroDatum(SyncMarkerSchema); + markerDatum.Fill(*m_reader, context); + auto marker = markerDatum.Value(); + if (marker != m_syncMarker) { + throw std::runtime_error("Sync marker doesn't match."); + } + m_eof = m_reader->TryPreload(1, context) == 0; + } + return objectDatum; +} + +size_t AvroStreamParser::OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) { + if (m_parserBuffer.Length != 0) { + size_t bytesToCopy = (std::min)(m_parserBuffer.Length, count); + std::memcpy(buffer, m_parserBuffer.Data, bytesToCopy); + m_parserBuffer.Data += bytesToCopy; + m_parserBuffer.Length -= bytesToCopy; + return bytesToCopy; + } + while (!m_parser.End()) { + auto datum = m_parser.Next(context); + if (datum.Schema().Type() == AvroDatumType::Union) { + datum = datum.Value(); + } + if (datum.Schema().Type() != AvroDatumType::Record) { + continue; + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.resultData") { + auto record = datum.Value(); + auto dataDatum = record.Field("data"); + m_parserBuffer = dataDatum.Value(); + return OnRead(buffer, count, context); + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.progress" && m_progressCallback) { + auto record = datum.Value(); + auto bytesScanned = record.Field("bytesScanned").Value(); + auto totalBytes = record.Field("totalBytes").Value(); + m_progressCallback(bytesScanned, totalBytes); + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.error" && m_errorCallback) { + auto record = datum.Value(); + BlobQueryError e; + e.Name = record.Field("name").Value(); + e.Description = record.Field("description").Value(); + e.IsFatal = record.Field("fatal").Value(); + e.Position = record.Field("position").Value(); + m_errorCallback(std::move(e)); + } + } + return 0; +} +} // namespace _detail +} // namespace Blobs +} // namespace Storage +} // namespace Azure + +#endif diff --git a/source/libs/azure/src/avro_parser.hpp b/source/libs/azure/src/avro_parser.hpp new file mode 100644 index 0000000000..275d073c85 --- /dev/null +++ b/source/libs/azure/src/avro_parser.hpp @@ -0,0 +1,198 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include "azure/storage/blobs/blob_options.hpp" + +#include + +#include +#include +#include + +namespace Azure { namespace Storage { namespace Blobs { namespace _detail { + enum class AvroDatumType + { + String, + Bytes, + Int, + Long, + Float, + Double, + Bool, + Null, + Record, + Enum, + Array, + Map, + Union, + Fixed, + }; + + class AvroStreamReader final { + public: + // position of a vector that lives through vector resizing + struct ReaderPos final + { + const std::vector* BufferPtr = nullptr; + size_t Offset = 0; + }; + explicit AvroStreamReader(Core::IO::BodyStream& stream) + : m_stream(&stream), m_pos{&m_streambuffer, 0} + { + } + AvroStreamReader(const AvroStreamReader&) = delete; + AvroStreamReader& operator=(const AvroStreamReader&) = delete; + + int64_t ParseInt(const Core::Context& context); + void Advance(size_t n, const Core::Context& context); + // Read at least n bytes from m_stream and append data to m_streambuffer. Return number of bytes + // available in m_streambuffer; + size_t Preload(size_t n, const Core::Context& context); + size_t TryPreload(size_t n, const Core::Context& context); + // discards data that's before m_pos + void Discard(); + + private: + size_t AvailableBytes() const { return m_streambuffer.size() - m_pos.Offset; } + + private: + Core::IO::BodyStream* m_stream; + std::vector m_streambuffer; + ReaderPos m_pos; + + friend class AvroDatum; + }; + + class AvroSchema final { + public: + static const AvroSchema StringSchema; + static const AvroSchema BytesSchema; + static const AvroSchema IntSchema; + static const AvroSchema LongSchema; + static const AvroSchema FloatSchema; + static const AvroSchema DoubleSchema; + static const AvroSchema BoolSchema; + static const AvroSchema NullSchema; + static AvroSchema RecordSchema( + std::string name, + const std::vector>& fieldsSchema); + static AvroSchema ArraySchema(AvroSchema elementSchema); + static AvroSchema MapSchema(AvroSchema elementSchema); + static AvroSchema UnionSchema(std::vector schemas); + static AvroSchema FixedSchema(std::string name, int64_t size); + + const std::string& Name() const { return m_name; } + AvroDatumType Type() const { return m_type; } + const std::vector& FieldNames() const { return m_status->m_keys; } + AvroSchema ItemSchema() const { return m_status->m_schemas[0]; } + const std::vector& FieldSchemas() const { return m_status->m_schemas; } + size_t Size() const { return static_cast(m_status->m_size); } + + private: + explicit AvroSchema(AvroDatumType type) : m_type(type) {} + + private: + AvroDatumType m_type; + std::string m_name; + + struct SharedStatus + { + std::vector m_keys; + std::vector m_schemas; + int64_t m_size = 0; + }; + std::shared_ptr m_status; + }; + + class AvroDatum final { + public: + AvroDatum() : m_schema(AvroSchema::NullSchema) {} + explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {} + + void Fill(AvroStreamReader& reader, const Core::Context& context); + void Fill(AvroStreamReader::ReaderPos& data); + + const AvroSchema& Schema() const { return m_schema; } + + template T Value() const; + struct StringView + { + const uint8_t* Data = nullptr; + size_t Length = 0; + }; + + private: + AvroSchema m_schema; + AvroStreamReader::ReaderPos m_data; + }; + + using AvroMap = std::map; + + class AvroRecord final { + public: + bool HasField(const std::string& key) const { return FindField(key) != m_keys->size(); } + const AvroDatum& Field(const std::string& key) const { return m_values.at(FindField(key)); } + AvroDatum& Field(const std::string& key) { return m_values.at(FindField(key)); } + const AvroDatum& FieldAt(size_t i) const { return m_values.at(i); } + AvroDatum& FieldAt(size_t i) { return m_values.at(i); } + + private: + size_t FindField(const std::string& key) const + { + auto i = find(m_keys->begin(), m_keys->end(), key); + return i - m_keys->begin(); + } + const std::vector* m_keys = nullptr; + std::vector m_values; + + friend class AvroDatum; + }; + + class AvroObjectContainerReader final { + public: + explicit AvroObjectContainerReader(Core::IO::BodyStream& stream); + + bool End() const { return m_eof; } + // Calling Next() will invalidates the previous AvroDatum returned by this function and all + // AvroDatums propagated from there. + AvroDatum Next(const Core::Context& context) { return NextImpl(m_objectSchema.get(), context); } + + private: + AvroDatum NextImpl(const AvroSchema* schema, const Core::Context& context); + + private: + std::unique_ptr m_reader; + std::unique_ptr m_objectSchema; + std::string m_syncMarker; + int64_t m_remainingObjectInCurrentBlock = 0; + bool m_eof = false; + }; + + class AvroStreamParser final : public Core::IO::BodyStream { + public: + explicit AvroStreamParser( + std::unique_ptr inner, + std::function progressCallback, + std::function errorCallback) + : m_inner(std::move(inner)), m_parser(*m_inner), + m_progressCallback(std::move(progressCallback)), m_errorCallback(std::move(errorCallback)) + { + } + + int64_t Length() const override { return -1; } + void Rewind() override { this->m_inner->Rewind(); } + + private: + size_t OnRead(uint8_t* buffer, size_t count, const Azure::Core::Context& context) override; + + private: + std::unique_ptr m_inner; + AvroObjectContainerReader m_parser; + std::function m_progressCallback; + std::function m_errorCallback; + AvroDatum::StringView m_parserBuffer; + }; + +}}}} // namespace Azure::Storage::Blobs::_detail diff --git a/source/libs/azure/src/az.cpp b/source/libs/azure/src/az.cpp new file mode 100644 index 0000000000..83cba1c877 --- /dev/null +++ b/source/libs/azure/src/az.cpp @@ -0,0 +1,402 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define ALLOW_FORBID_FUNC + +#include "az.h" + +#include "os.h" +#include "taoserror.h" +#include "tglobal.h" + +#if defined(USE_S3) + +#include +#include +#include "td_block_blob_client.hpp" + +// Add appropriate using namespace directives +using namespace Azure::Storage; +using namespace Azure::Storage::Blobs; + +extern char tsS3Hostname[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; +extern char tsS3BucketName[TSDB_FQDN_LEN]; + +extern int8_t tsS3Enabled; +extern int8_t tsS3EpNum; + +int32_t azBegin() { return TSDB_CODE_SUCCESS; } + +void azEnd() {} + +static void azDumpCfgByEp(int8_t epIndex) { + // clang-format off + (void)fprintf(stdout, + "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n" + // "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n", + "hostName", tsS3Hostname[epIndex], + "bucketName", tsS3BucketName, + "protocol", "https only", + //"uristyle", (uriStyleG[epIndex] == S3UriStyleVirtualHost ? "virtualhost" : "path"), + "accessKey", tsS3AccessKeyId[epIndex], + "accessKeySecret", tsS3AccessKeySecret[epIndex]); + // clang-format on +} + +static int32_t azListBucket(char const *bucketname) { + int32_t code = 0; + const std::string delimiter = "/"; + std::string accountName = tsS3AccessKeyId[0]; + std::string accountKey = tsS3AccessKeySecret[0]; + std::string accountURL = tsS3Hostname[0]; + accountURL = "https://" + accountURL; + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = bucketname; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + Azure::Storage::Blobs::ListBlobsOptions options; + options.Prefix = "s3"; + + (void)fprintf(stderr, "objects:\n"); + // std::set listBlobs; + for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) { + for (const auto &blob : pageResult.Blobs) { + (void)fprintf(stderr, "%s\n", blob.Name.c_str()); + } + } + } catch (const Azure::Core::RequestFailedException &e) { + uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast(e.StatusCode), + e.ReasonPhrase.c_str()); + // uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TAOS_SYSTEM_ERROR(EIO))); + + code = TAOS_SYSTEM_ERROR(EIO); + TAOS_RETURN(code); + } + + TAOS_RETURN(code); +} + +int32_t azCheckCfg() { + int32_t code = 0, lino = 0; + int8_t i = 0; + + // for (; i < tsS3EpNum; i++) { + (void)fprintf(stdout, "test s3 ep (%d/%d):\n", i + 1, tsS3EpNum); + // s3DumpCfgByEp(i); + azDumpCfgByEp(0); + + // test put + char testdata[17] = "0123456789abcdef"; + const char *objectname[] = {"s3test.txt"}; + char path[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]); + } + + uint8_t *pBlock = NULL; + int c_offset = 10; + int c_len = 6; + char buf[7] = {0}; + + TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (!fp) { + (void)fprintf(stderr, "failed to open test file: %s.\n", path); + // uError("ERROR: %s Failed to open %s", __func__, path); + TAOS_CHECK_GOTO(terrno, &lino, _next); + } + if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { + (void)fprintf(stderr, "failed to write test file: %s.\n", path); + TAOS_CHECK_GOTO(terrno, &lino, _next); + } + if (taosFsyncFile(fp) < 0) { + (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); + TAOS_CHECK_GOTO(terrno, &lino, _next); + } + (void)taosCloseFile(&fp); + + (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); + code = azPutObjectFromFileOffset(path, objectname[0], 0, 16); + if (code != 0) { + (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _next); + } + (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); + + // list buckets + (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName); + // code = s3ListBucketByEp(tsS3BucketName, i); + code = azListBucket(tsS3BucketName); + if (code != 0) { + (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); + TAOS_CHECK_GOTO(code, &lino, _next); + } + (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); + + // test range get + (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); + code = azGetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock); + if (code != 0) { + (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _next); + } + + (void)memcpy(buf, pBlock, c_len); + taosMemoryFree(pBlock); + (void)fprintf(stderr, "object content: %s\n", buf); + (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + + // delete test object + (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]); + // code = azDeleteObjectsByPrefix(objectname[0]); + azDeleteObjectsByPrefix(objectname[0]); + /* + if (code != 0) { + (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _next); + } + */ + (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); + +_next: + if (fp) { + (void)taosCloseFile(&fp); + } + + if (TSDB_CODE_SUCCESS != code) { + (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); + } + + (void)fprintf(stdout, "=================================================================\n"); + //} + + // azEnd(); + + TAOS_RETURN(code); +} + +int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { + int32_t code = 0; + + std::string endpointUrl = tsS3Hostname[0]; // GetEndpointUrl(); + std::string accountName = tsS3AccessKeyId[0]; // GetAccountName(); + std::string accountKey = tsS3AccessKeySecret[0]; // GetAccountKey(); + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + std::string accountURL = tsS3Hostname[0]; + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + accountURL = "https://" + accountURL; + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = tsS3BucketName; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + // Create the container if it does not exist + // std::cout << "Creating container: " << containerName << std::endl; + // containerClient.CreateIfNotExists(); + + std::string blobName = "blob.txt"; + uint8_t blobContent[] = "Hello Azure!"; + // Create the block blob client + // BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName); + // TDBlockBlobClient blobClient(containerClient.GetBlobClient(blobName)); + TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name)); + + // Upload the blob + // std::cout << "Uploading blob: " << blobName << std::endl; + // blobClient.UploadFrom(blobContent, sizeof(blobContent)); + blobClient.UploadFrom(file, offset, size); + //(void)_azUploadFrom(blobClient, file, offset, size); + /* + auto blockBlobClient = BlockBlobClient(endpointUrl, sharedKeyCredential); + + // Create some data to upload into the blob. + std::vector data = {1, 2, 3, 4}; + Azure::Core::IO::MemoryBodyStream stream(data); + + Azure::Response response = blockBlobClient.Upload(stream); + + Models::UploadBlockBlobResult model = response.Value; + std::cout << "Last modified date of uploaded blob: " << model.LastModified.ToString() + << std::endl; + */ + } catch (const Azure::Core::RequestFailedException &e) { + /* + std::cout << "Status Code: " << static_cast(e.StatusCode) << ", Reason Phrase: " << e.ReasonPhrase + << std::endl; + std::cout << e.what() << std::endl; + */ + code = TAOS_SYSTEM_ERROR(EIO); + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + TAOS_RETURN(code); + } + + TAOS_RETURN(code); +} + +int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + int32_t code = TSDB_CODE_SUCCESS; + std::string accountName = tsS3AccessKeyId[0]; + std::string accountKey = tsS3AccessKeySecret[0]; + std::string accountURL = tsS3Hostname[0]; + accountURL = "https://" + accountURL; + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = tsS3BucketName; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name)); + + uint8_t *buf = (uint8_t *)taosMemoryCalloc(1, size); + if (!buf) { + return terrno; + } + + Blobs::DownloadBlobToOptions options; + // options.TransferOptions.Concurrency = concurrency; + // if (offset.HasValue() || length.HasValue()) { + options.Range = Azure::Core::Http::HttpRange(); + options.Range.Value().Offset = offset; + options.Range.Value().Length = size; + //} + /* + if (initialChunkSize.HasValue()) { + options.TransferOptions.InitialChunkSize = initialChunkSize.Value(); + } + if (chunkSize.HasValue()) { + options.TransferOptions.ChunkSize = chunkSize.Value(); + } + */ + + auto res = blobClient.DownloadTo(buf, size, options); + if (check && res.Value.ContentRange.Length.Value() != size) { + code = TAOS_SYSTEM_ERROR(EIO); + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + TAOS_RETURN(code); + } + + *ppBlock = buf; + } catch (const Azure::Core::RequestFailedException &e) { + uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast(e.StatusCode), + e.ReasonPhrase.c_str()); + code = TAOS_SYSTEM_ERROR(EIO); + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + TAOS_RETURN(code); + } + + TAOS_RETURN(code); +} + +void azDeleteObjectsByPrefix(const char *prefix) { + const std::string delimiter = "/"; + std::string accountName = tsS3AccessKeyId[0]; + std::string accountKey = tsS3AccessKeySecret[0]; + std::string accountURL = tsS3Hostname[0]; + accountURL = "https://" + accountURL; + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = tsS3BucketName; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + Azure::Storage::Blobs::ListBlobsOptions options; + options.Prefix = prefix; + + std::set listBlobs; + for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) { + for (const auto &blob : pageResult.Blobs) { + listBlobs.insert(blob.Name); + } + } + + for (auto blobName : listBlobs) { + auto blobClient = containerClient.GetAppendBlobClient(blobName); + blobClient.Delete(); + } + } catch (const Azure::Core::RequestFailedException &e) { + uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast(e.StatusCode), + e.ReasonPhrase.c_str()); + // uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TAOS_SYSTEM_ERROR(EIO))); + } +} + +int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } + +int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; } + +int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; } + +int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; } + +#else + +int32_t azBegin() { return TSDB_CODE_SUCCESS; } + +void azEnd() {} + +int32_t azCheckCfg() { return TSDB_CODE_SUCCESS; } + +int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { + return TSDB_CODE_SUCCESS; +} + +int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + return TSDB_CODE_SUCCESS; +} + +void azDeleteObjectsByPrefix(const char *prefix) {} + +int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } + +int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; } + +int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; } + +int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; } + +#endif diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp new file mode 100644 index 0000000000..e75c6ae17f --- /dev/null +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -0,0 +1,625 @@ +#if defined(USE_S3) + +#include "td_block_blob_client.hpp" + +#include + +#if defined(AZ_PLATFORM_WINDOWS) +#if !defined(WIN32_LEAN_AND_MEAN) +#define WIN32_LEAN_AND_MEAN +#endif +#if !defined(NOMINMAX) +#define NOMINMAX +#endif +#include +#endif + +#include "./avro_parser.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Azure { +namespace Storage { +namespace Blobs { + +TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::string& connectionString, + const std::string& blobContainerName, + const std::string& blobName, + const BlobClientOptions& options) { + TDBlockBlobClient newClient( + BlobClient::CreateFromConnectionString(connectionString, blobContainerName, blobName, options)); + return newClient; +} + +TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, + const BlobClientOptions& options) + : BlobClient(blobUrl, std::move(credential), options) {} + +TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, + std::shared_ptr credential, + const BlobClientOptions& options) + : BlobClient(blobUrl, std::move(credential), options) {} + +TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options) + : BlobClient(blobUrl, options) {} + +TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} + +TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const { + TDBlockBlobClient newClient(*this); + if (snapshot.empty()) { + newClient.m_blobUrl.RemoveQueryParameter(_internal::HttpQuerySnapshot); + } else { + newClient.m_blobUrl.AppendQueryParameter(_internal::HttpQuerySnapshot, + _internal::UrlEncodeQueryParameter(snapshot)); + } + return newClient; +} + +TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId) const { + TDBlockBlobClient newClient(*this); + if (versionId.empty()) { + newClient.m_blobUrl.RemoveQueryParameter(_internal::HttpQueryVersionId); + } else { + newClient.m_blobUrl.AppendQueryParameter(_internal::HttpQueryVersionId, + _internal::UrlEncodeQueryParameter(versionId)); + } + return newClient; +} + +Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, + const UploadBlockBlobOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (options.ImmutabilityPolicy.HasValue()) { + protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; + protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; + } + protocolLayerOptions.LegalHold = options.HasLegalHold; + + return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); +} + +Azure::Response TDBlockBlobClient::UploadFrom( + const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options, + const Azure::Core::Context& context) const { + constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL; + constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL; + constexpr int64_t MaxBlockNumber = 50000; + constexpr int64_t BlockGrainSize = 1 * 1024 * 1024; + + if (static_cast(options.TransferOptions.SingleUploadThreshold) > (std::numeric_limits::max)()) { + throw Azure::Core::RequestFailedException("Single upload threshold is too big"); + } + if (bufferSize <= static_cast(options.TransferOptions.SingleUploadThreshold)) { + Azure::Core::IO::MemoryBodyStream contentStream(buffer, bufferSize); + UploadBlockBlobOptions uploadBlockBlobOptions; + uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders; + uploadBlockBlobOptions.Metadata = options.Metadata; + uploadBlockBlobOptions.Tags = options.Tags; + uploadBlockBlobOptions.AccessTier = options.AccessTier; + uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold; + return Upload(contentStream, uploadBlockBlobOptions, context); + } + + int64_t chunkSize; + if (options.TransferOptions.ChunkSize.HasValue()) { + chunkSize = options.TransferOptions.ChunkSize.Value(); + } else { + int64_t minChunkSize = (bufferSize + MaxBlockNumber - 1) / MaxBlockNumber; + minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize; + chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize); + } + if (chunkSize > MaxStageBlockSize) { + throw Azure::Core::RequestFailedException("Block size is too big."); + } + + std::vector blockIds; + auto getBlockId = [](int64_t id) { + constexpr size_t BlockIdLength = 64; + std::string blockId = std::to_string(id); + blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId; + return Azure::Core::Convert::Base64Encode(std::vector(blockId.begin(), blockId.end())); + }; + + auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) { + Azure::Core::IO::MemoryBodyStream contentStream(buffer + offset, static_cast(length)); + StageBlockOptions chunkOptions; + auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context); + if (chunkId == numChunks - 1) { + blockIds.resize(static_cast(numChunks)); + } + }; + + _internal::ConcurrentTransfer(0, bufferSize, chunkSize, options.TransferOptions.Concurrency, uploadBlockFunc); + + for (size_t i = 0; i < blockIds.size(); ++i) { + blockIds[i] = getBlockId(static_cast(i)); + } + CommitBlockListOptions commitBlockListOptions; + commitBlockListOptions.HttpHeaders = options.HttpHeaders; + commitBlockListOptions.Metadata = options.Metadata; + commitBlockListOptions.Tags = options.Tags; + commitBlockListOptions.AccessTier = options.AccessTier; + commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + commitBlockListOptions.HasLegalHold = options.HasLegalHold; + auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context); + + Models::UploadBlockBlobFromResult ret; + ret.ETag = std::move(commitBlockListResponse.Value.ETag); + ret.LastModified = std::move(commitBlockListResponse.Value.LastModified); + ret.VersionId = std::move(commitBlockListResponse.Value.VersionId); + ret.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted; + ret.EncryptionKeySha256 = std::move(commitBlockListResponse.Value.EncryptionKeySha256); + ret.EncryptionScope = std::move(commitBlockListResponse.Value.EncryptionScope); + return Azure::Response(std::move(ret), + std::move(commitBlockListResponse.RawResponse)); +} + +Azure::Response TDBlockBlobClient::UploadFrom( + const std::string& fileName, const UploadBlockBlobFromOptions& options, const Azure::Core::Context& context) const { + constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL; + constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL; + constexpr int64_t MaxBlockNumber = 50000; + constexpr int64_t BlockGrainSize = 1 * 1024 * 1024; + + { + Azure::Core::IO::FileBodyStream contentStream(fileName); + + if (contentStream.Length() <= options.TransferOptions.SingleUploadThreshold) { + UploadBlockBlobOptions uploadBlockBlobOptions; + uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders; + uploadBlockBlobOptions.Metadata = options.Metadata; + uploadBlockBlobOptions.Tags = options.Tags; + uploadBlockBlobOptions.AccessTier = options.AccessTier; + uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold; + return Upload(contentStream, uploadBlockBlobOptions, context); + } + } + + std::vector blockIds; + auto getBlockId = [](int64_t id) { + constexpr size_t BlockIdLength = 64; + std::string blockId = std::to_string(id); + blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId; + return Azure::Core::Convert::Base64Encode(std::vector(blockId.begin(), blockId.end())); + }; + + _internal::FileReader fileReader(fileName); + + auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) { + Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, length); + StageBlockOptions chunkOptions; + auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context); + if (chunkId == numChunks - 1) { + blockIds.resize(static_cast(numChunks)); + } + }; + + int64_t chunkSize; + if (options.TransferOptions.ChunkSize.HasValue()) { + chunkSize = options.TransferOptions.ChunkSize.Value(); + } else { + int64_t minChunkSize = (fileReader.GetFileSize() + MaxBlockNumber - 1) / MaxBlockNumber; + minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize; + chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize); + } + if (chunkSize > MaxStageBlockSize) { + throw Azure::Core::RequestFailedException("Block size is too big."); + } + + _internal::ConcurrentTransfer(0, fileReader.GetFileSize(), chunkSize, options.TransferOptions.Concurrency, + uploadBlockFunc); + + for (size_t i = 0; i < blockIds.size(); ++i) { + blockIds[i] = getBlockId(static_cast(i)); + } + CommitBlockListOptions commitBlockListOptions; + commitBlockListOptions.HttpHeaders = options.HttpHeaders; + commitBlockListOptions.Metadata = options.Metadata; + commitBlockListOptions.Tags = options.Tags; + commitBlockListOptions.AccessTier = options.AccessTier; + commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + commitBlockListOptions.HasLegalHold = options.HasLegalHold; + auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context); + + Models::UploadBlockBlobFromResult result; + result.ETag = commitBlockListResponse.Value.ETag; + result.LastModified = commitBlockListResponse.Value.LastModified; + result.VersionId = commitBlockListResponse.Value.VersionId; + result.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted; + result.EncryptionKeySha256 = commitBlockListResponse.Value.EncryptionKeySha256; + result.EncryptionScope = commitBlockListResponse.Value.EncryptionScope; + return Azure::Response(std::move(result), + std::move(commitBlockListResponse.RawResponse)); +} + +Azure::Response TDBlockBlobClient::UploadFrom( + const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options, + const Azure::Core::Context& context) const { + _internal::FileReader fileReader(fileName); + + { + Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, size); + + if (size <= options.TransferOptions.SingleUploadThreshold) { + UploadBlockBlobOptions uploadBlockBlobOptions; + uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders; + uploadBlockBlobOptions.Metadata = options.Metadata; + uploadBlockBlobOptions.Tags = options.Tags; + uploadBlockBlobOptions.AccessTier = options.AccessTier; + uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold; + return Upload(contentStream, uploadBlockBlobOptions, context); + } + } + + std::vector blockIds; + auto getBlockId = [](int64_t id) { + constexpr size_t BlockIdLength = 64; + std::string blockId = std::to_string(id); + blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId; + return Azure::Core::Convert::Base64Encode(std::vector(blockId.begin(), blockId.end())); + }; + + auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) { + Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, length); + StageBlockOptions chunkOptions; + auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context); + if (chunkId == numChunks - 1) { + blockIds.resize(static_cast(numChunks)); + } + }; + + constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL; + constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL; + constexpr int64_t MaxBlockNumber = 50000; + constexpr int64_t BlockGrainSize = 1 * 1024 * 1024; + + int64_t chunkSize; + if (options.TransferOptions.ChunkSize.HasValue()) { + chunkSize = options.TransferOptions.ChunkSize.Value(); + } else { + int64_t minChunkSize = (size + MaxBlockNumber - 1) / MaxBlockNumber; + minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize; + chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize); + } + if (chunkSize > MaxStageBlockSize) { + throw Azure::Core::RequestFailedException("Block size is too big."); + } + + _internal::ConcurrentTransfer(offset, size, chunkSize, options.TransferOptions.Concurrency, uploadBlockFunc); + + for (size_t i = 0; i < blockIds.size(); ++i) { + blockIds[i] = getBlockId(static_cast(i)); + } + CommitBlockListOptions commitBlockListOptions; + commitBlockListOptions.HttpHeaders = options.HttpHeaders; + commitBlockListOptions.Metadata = options.Metadata; + commitBlockListOptions.Tags = options.Tags; + commitBlockListOptions.AccessTier = options.AccessTier; + commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy; + commitBlockListOptions.HasLegalHold = options.HasLegalHold; + auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context); + + Models::UploadBlockBlobFromResult result; + result.ETag = commitBlockListResponse.Value.ETag; + result.LastModified = commitBlockListResponse.Value.LastModified; + result.VersionId = commitBlockListResponse.Value.VersionId; + result.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted; + result.EncryptionKeySha256 = commitBlockListResponse.Value.EncryptionKeySha256; + result.EncryptionScope = commitBlockListResponse.Value.EncryptionScope; + return Azure::Response(std::move(result), + std::move(commitBlockListResponse.RawResponse)); +} + +Azure::Response TDBlockBlobClient::UploadFromUri( + const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::UploadBlockBlobFromUriOptions protocolLayerOptions; + protocolLayerOptions.CopySource = sourceUri; + protocolLayerOptions.CopySourceBlobProperties = options.CopySourceBlobProperties; + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + protocolLayerOptions.SourceIfMatch = options.SourceAccessConditions.IfMatch; + protocolLayerOptions.SourceIfNoneMatch = options.SourceAccessConditions.IfNoneMatch; + protocolLayerOptions.SourceIfModifiedSince = options.SourceAccessConditions.IfModifiedSince; + protocolLayerOptions.SourceIfUnmodifiedSince = options.SourceAccessConditions.IfUnmodifiedSince; + protocolLayerOptions.SourceIfTags = options.SourceAccessConditions.TagConditions; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.SourceContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.SourceContentcrc64 = options.TransactionalContentHash.Value().Value; + } + } + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + protocolLayerOptions.CopySourceTags = options.CopySourceTagsMode; + if (!options.SourceAuthorization.empty()) { + protocolLayerOptions.CopySourceAuthorization = options.SourceAuthorization; + } + + return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); +} + +Azure::Response TDBlockBlobClient::StageBlock(const std::string& blockId, + Azure::Core::IO::BodyStream& content, + const StageBlockOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::StageBlockBlobBlockOptions protocolLayerOptions; + protocolLayerOptions.BlockId = blockId; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); +} + +Azure::Response TDBlockBlobClient::StageBlockFromUri( + const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::StageBlockBlobBlockFromUriOptions protocolLayerOptions; + protocolLayerOptions.BlockId = blockId; + protocolLayerOptions.SourceUrl = sourceUri; + if (options.SourceRange.HasValue()) { + std::string rangeStr = "bytes=" + std::to_string(options.SourceRange.Value().Offset) + "-"; + if (options.SourceRange.Value().Length.HasValue()) { + rangeStr += std::to_string(options.SourceRange.Value().Offset + options.SourceRange.Value().Length.Value() - 1); + } + protocolLayerOptions.SourceRange = rangeStr; + } + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.SourceContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.SourceContentcrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.SourceIfModifiedSince = options.SourceAccessConditions.IfModifiedSince; + protocolLayerOptions.SourceIfUnmodifiedSince = options.SourceAccessConditions.IfUnmodifiedSince; + protocolLayerOptions.SourceIfMatch = options.SourceAccessConditions.IfMatch; + protocolLayerOptions.SourceIfNoneMatch = options.SourceAccessConditions.IfNoneMatch; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (!options.SourceAuthorization.empty()) { + protocolLayerOptions.CopySourceAuthorization = options.SourceAuthorization; + } + + return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); +} + +Azure::Response TDBlockBlobClient::CommitBlockList( + const std::vector& blockIds, const CommitBlockListOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::CommitBlockBlobBlockListOptions protocolLayerOptions; + protocolLayerOptions.Blocks.Latest = blockIds; + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (options.ImmutabilityPolicy.HasValue()) { + protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; + protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; + } + protocolLayerOptions.LegalHold = options.HasLegalHold; + + return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context); +} + +Azure::Response TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions; + protocolLayerOptions.ListType = options.ListType; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, + _internal::WithReplicaStatus(context)); +} +/* +Azure::Response TDBlockBlobClient::Query(const std::string& querySqlExpression, + const QueryBlobOptions& options, + const Azure::Core::Context& context) const { +_detail::BlobClient::QueryBlobOptions protocolLayerOptions; +protocolLayerOptions.QueryRequest.QueryType = Models::_detail::QueryRequestQueryType::SQL; +protocolLayerOptions.QueryRequest.Expression = querySqlExpression; +if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Delimited) { + Models::_detail::DelimitedTextConfiguration c; + c.RecordSeparator = options.InputTextConfiguration.m_recordSeparator; + c.ColumnSeparator = options.InputTextConfiguration.m_columnSeparator; + c.FieldQuote = options.InputTextConfiguration.m_quotationCharacter; + c.EscapeChar = options.InputTextConfiguration.m_escapeCharacter; + c.HeadersPresent = options.InputTextConfiguration.m_hasHeaders; + Models::_detail::QuerySerialization q; + q.Format.Type = options.InputTextConfiguration.m_format; + q.Format.DelimitedTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.InputSerialization = std::move(q); +} else if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Json) { + Models::_detail::JsonTextConfiguration c; + c.RecordSeparator = options.InputTextConfiguration.m_recordSeparator; + Models::_detail::QuerySerialization q; + q.Format.Type = options.InputTextConfiguration.m_format; + q.Format.JsonTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.InputSerialization = std::move(q); +} else if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Parquet) { + Models::_detail::ParquetConfiguration c; + Models::_detail::QuerySerialization q; + q.Format.Type = options.InputTextConfiguration.m_format; + q.Format.ParquetTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.InputSerialization = std::move(q); +} else if (options.InputTextConfiguration.m_format.ToString().empty()) { +} else { + AZURE_UNREACHABLE_CODE(); +} +if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Delimited) { + Models::_detail::DelimitedTextConfiguration c; + c.RecordSeparator = options.OutputTextConfiguration.m_recordSeparator; + c.ColumnSeparator = options.OutputTextConfiguration.m_columnSeparator; + c.FieldQuote = options.OutputTextConfiguration.m_quotationCharacter; + c.EscapeChar = options.OutputTextConfiguration.m_escapeCharacter; + c.HeadersPresent = options.OutputTextConfiguration.m_hasHeaders; + Models::_detail::QuerySerialization q; + q.Format.Type = options.OutputTextConfiguration.m_format; + q.Format.DelimitedTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q); +} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Json) { + Models::_detail::JsonTextConfiguration c; + c.RecordSeparator = options.OutputTextConfiguration.m_recordSeparator; + Models::_detail::QuerySerialization q; + q.Format.Type = options.OutputTextConfiguration.m_format; + q.Format.JsonTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q); +} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Parquet) { + Models::_detail::ParquetConfiguration c; + Models::_detail::QuerySerialization q; + q.Format.Type = options.OutputTextConfiguration.m_format; + q.Format.ParquetTextConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q); +} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Arrow) { + Models::_detail::ArrowConfiguration c; + c.Schema = options.OutputTextConfiguration.m_schema; + Models::_detail::QuerySerialization q; + q.Format.Type = options.OutputTextConfiguration.m_format; + q.Format.ArrowConfiguration = std::move(c); + protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q); +} else if (options.InputTextConfiguration.m_format.ToString().empty()) { +} else { + AZURE_UNREACHABLE_CODE(); +} + +protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; +if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); +} +protocolLayerOptions.EncryptionScope = m_encryptionScope; +protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; +protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; +protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; +protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; +protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; +auto response = + _detail::BlobClient::Query(*m_pipeline, m_blobUrl, protocolLayerOptions, _internal::WithReplicaStatus(context)); + +const auto statusCode = response.RawResponse->GetStatusCode(); +const auto reasonPhrase = response.RawResponse->GetReasonPhrase(); +const auto requestId = response.RawResponse->GetHeaders().count(_internal::HttpHeaderRequestId) != 0 + ? response.RawResponse->GetHeaders().at(_internal::HttpHeaderRequestId) + : std::string(); + +const auto clientRequestId = response.RawResponse->GetHeaders().count(_internal::HttpHeaderClientRequestId) != 0 + ? response.RawResponse->GetHeaders().at(_internal::HttpHeaderClientRequestId) + : std::string(); + +auto defaultErrorHandler = [statusCode, reasonPhrase, requestId, clientRequestId](BlobQueryError e) { + if (e.IsFatal) { + StorageException exception("Fatal " + e.Name + " at " + std::to_string(e.Position)); + exception.StatusCode = statusCode; + exception.ReasonPhrase = reasonPhrase; + exception.RequestId = requestId; + exception.ClientRequestId = clientRequestId; + exception.ErrorCode = e.Name; + exception.Message = e.Description; + + throw exception; + } +}; + +response.Value.BodyStream = + std::make_unique<_detail::AvroStreamParser>(std::move(response.Value.BodyStream), options.ProgressHandler, + options.ErrorHandler ? options.ErrorHandler : defaultErrorHandler); +return response; +} +*/ +} // namespace Blobs +} // namespace Storage +} // namespace Azure + +#endif diff --git a/source/libs/azure/test/CMakeLists.txt b/source/libs/azure/test/CMakeLists.txt new file mode 100644 index 0000000000..01570df730 --- /dev/null +++ b/source/libs/azure/test/CMakeLists.txt @@ -0,0 +1,18 @@ +aux_source_directory(. AZ_TEST_SRC) + +add_executable(azTest ${AZ_TEST_SRC}) +target_include_directories(azTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/azure" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries(azTest + az + gtest_main +) +enable_testing() +add_test( + NAME az_test + COMMAND azTest +) diff --git a/source/libs/azure/test/azTest.cpp b/source/libs/azure/test/azTest.cpp new file mode 100644 index 0000000000..9e963508f8 --- /dev/null +++ b/source/libs/azure/test/azTest.cpp @@ -0,0 +1,457 @@ +#include +#include +#include +#include +/* +#include "walInt.h" + +const char* ranStr = "tvapq02tcp"; +const int ranStrLen = strlen(ranStr); +SWalSyncInfo syncMeta = {0}; + +class WalCleanEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(NULL); + ASSERT(code == 0); + } + + static void TearDownTestCase() { walCleanUp(); } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->retentionPeriod = 0; + pCfg->retentionSize = 0; + pCfg->level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + taosMemoryFree(pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = TD_TMP_DIR_PATH "wal_test"; +}; + +class WalCleanDeleteEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(NULL); + ASSERT(code == 0); + } + + static void TearDownTestCase() { walCleanUp(); } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->retentionPeriod = 0; + pCfg->retentionSize = 0; + pCfg->level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + taosMemoryFree(pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = TD_TMP_DIR_PATH "wal_test"; +}; + +class WalKeepEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(NULL); + ASSERT(code == 0); + } + + static void TearDownTestCase() { walCleanUp(); } + + void walResetEnv() { + TearDown(); + taosRemoveDir(pathName); + SetUp(); + } + + void SetUp() override { + SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->retentionPeriod = 0; + pCfg->retentionSize = 0; + pCfg->level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + taosMemoryFree(pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = TD_TMP_DIR_PATH "wal_test"; +}; + +class WalRetentionEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(NULL); + ASSERT(code == 0); + } + + static void TearDownTestCase() { walCleanUp(); } + + void walResetEnv() { + TearDown(); + taosRemoveDir(pathName); + SetUp(); + } + + void SetUp() override { + SWalCfg cfg; + cfg.rollPeriod = -1; + cfg.segSize = -1; + cfg.retentionPeriod = -1; + cfg.retentionSize = 0; + cfg.rollPeriod = 0; + cfg.vgId = 0; + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = TD_TMP_DIR_PATH "wal_test"; +}; + +TEST_F(WalCleanEnv, createNew) { + walRollFileInfo(pWal); + ASSERT(pWal->fileInfoSet != NULL); + ASSERT_EQ(pWal->fileInfoSet->size, 1); + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + ASSERT_EQ(pInfo->firstVer, 0); + ASSERT_EQ(pInfo->lastVer, -1); + ASSERT_EQ(pInfo->closeTs, -1); + ASSERT_EQ(pInfo->fileSize, 0); +} + +TEST_F(WalCleanEnv, serialize) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + char* ss = NULL; + code = walMetaSerialize(pWal, &ss); + ASSERT(code == 0); + printf("%s\n", ss); + taosMemoryFree(ss); + code = walSaveMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalCleanEnv, removeOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walSaveMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walSaveMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalKeepEnv, readOldMeta) { + walResetEnv(); + int code; + + syncMeta.isWeek = -1; + syncMeta.seqNum = UINT64_MAX; + syncMeta.term = UINT64_MAX; + + for (int i = 0; i < 10; i++) { + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER); + ASSERT_EQ(pWal->vers.lastVer, i); + } + char* oldss = NULL; + code = walMetaSerialize(pWal, &oldss); + ASSERT(code == 0); + + TearDown(); + SetUp(); + + ASSERT_EQ(pWal->vers.firstVer, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); + + char* newss = NULL; + code = walMetaSerialize(pWal, &newss); + ASSERT(code == 0); + + int len = strlen(oldss); + ASSERT_EQ(len, strlen(newss)); + for (int i = 0; i < len; i++) { + EXPECT_EQ(oldss[i], newss[i]); + } + taosMemoryFree(oldss); + taosMemoryFree(newss); +} + +TEST_F(WalCleanEnv, write) { + int code; + for (int i = 0; i < 10; i++) { + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER); + ASSERT_EQ(pWal->vers.lastVer, i); + } + code = walSaveMeta(pWal); + ASSERT_EQ(code, 0); +} + +TEST_F(WalCleanEnv, rollback) { + int code; + for (int i = 0; i < 10; i++) { + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + } + code = walRollback(pWal, 12); + ASSERT_NE(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); + code = walRollback(pWal, 9); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 8); + code = walRollback(pWal, 5); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 4); + code = walRollback(pWal, 3); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 2); + code = walSaveMeta(pWal); + ASSERT_EQ(code, 0); +} + +TEST_F(WalCleanEnv, rollbackMultiFile) { + int code; + for (int i = 0; i < 10; i++) { + code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + if (i == 5) { + walBeginSnapshot(pWal, i, 0); + walEndSnapshot(pWal); + } + } + code = walRollback(pWal, 12); + ASSERT_NE(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); + code = walRollback(pWal, 9); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 8); + code = walRollback(pWal, 6); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 5); + code = walRollback(pWal, 5); + ASSERT_NE(code, 0); + + ASSERT_EQ(pWal->vers.lastVer, 5); + + code = walAppendLog(pWal, 6, 6, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 6); + + code = walSaveMeta(pWal); + ASSERT_EQ(code, 0); +} + +TEST_F(WalCleanDeleteEnv, roll) { + int code; + int i; + for (i = 0; i < 100; i++) { + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->vers.commitVer, i); + } + + walBeginSnapshot(pWal, i - 1, 0); + ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1); + walEndSnapshot(pWal); + ASSERT_EQ(pWal->vers.snapshotVer, i - 1); + ASSERT_EQ(pWal->vers.verInSnapshotting, -1); + + code = walAppendLog(pWal, 5, 0, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_NE(code, 0); + + for (; i < 200; i++) { + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen); + ASSERT_EQ(code, 0); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->vers.commitVer, i); + } + + code = walBeginSnapshot(pWal, i - 1, 0); + ASSERT_EQ(code, 0); + code = walEndSnapshot(pWal); + ASSERT_EQ(code, 0); +} + +TEST_F(WalKeepEnv, readHandleRead) { + walResetEnv(); + int code; + SWalReader* pRead = walOpenReader(pWal, NULL, 0); + ASSERT(pRead != NULL); + + int i; + for (i = 0; i < 100; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); + ASSERT_EQ(code, 0); + } + for (int i = 0; i < 1000; i++) { + int ver = taosRand() % 100; + code = walReadVer(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + walCloseReader(pRead); +} + +TEST_F(WalRetentionEnv, repairMeta1) { + walResetEnv(); + int code; + + int i; + for (i = 0; i < 100; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); + ASSERT_EQ(code, 0); + } + + TearDown(); + + // getchar(); + char buf[100]; + sprintf(buf, "%s/meta-ver%d", pathName, 0); + taosRemoveFile(buf); + sprintf(buf, "%s/meta-ver%d", pathName, 1); + taosRemoveFile(buf); + SetUp(); + // getchar(); + + ASSERT_EQ(pWal->vers.lastVer, 99); + + SWalReader* pRead = walOpenReader(pWal, NULL, 0); + ASSERT(pRead != NULL); + + for (int i = 0; i < 1000; i++) { + int ver = taosRand() % 100; + code = walReadVer(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + + for (i = 100; i < 200; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len); + ASSERT_EQ(code, 0); + } + + for (int i = 0; i < 1000; i++) { + int ver = taosRand() % 200; + code = walReadVer(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + walCloseReader(pRead); +} +*/ diff --git a/source/libs/tcs/CMakeLists.txt b/source/libs/tcs/CMakeLists.txt index 4d74dedcd0..1c914a18b9 100644 --- a/source/libs/tcs/CMakeLists.txt +++ b/source/libs/tcs/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( tcs - # PUBLIC az + PUBLIC az PUBLIC common # PUBLIC cjson # PUBLIC os diff --git a/source/libs/tcs/src/tcs.c b/source/libs/tcs/src/tcs.c index c5c68c4933..5facffa4ac 100644 --- a/source/libs/tcs/src/tcs.c +++ b/source/libs/tcs/src/tcs.c @@ -19,7 +19,7 @@ #include "taoserror.h" #include "tglobal.h" -//#include "az.h" +#include "az.h" #include "cos.h" extern int8_t tsS3Ablob; @@ -68,7 +68,6 @@ int32_t tcsInit() { tcs.DeleteObjects = s3DeleteObjects; tcs.GetObjectToFile = s3GetObjectToFile; } else if (TOS_PROTO_ABLOB == proto) { - /* tcs.Begin = azBegin; tcs.End = azEnd; tcs.CheckCfg = azCheckCfg; @@ -82,7 +81,7 @@ int32_t tcsInit() { tcs.GetObjectsByPrefix = azGetObjectsByPrefix; tcs.DeleteObjects = azDeleteObjects; tcs.GetObjectToFile = azGetObjectToFile; - */ + } else { code = TSDB_CODE_INVALID_PARA; return code;