az: libaz module from ablob

This commit is contained in:
Minglei Jin 2024-10-10 18:48:34 +08:00
parent 3905c94f03
commit c0ef07c050
12 changed files with 2572 additions and 5 deletions

45
include/libs/azure/az.h Normal file
View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_AZURE_H_
#define _TD_AZURE_H_
#include "os.h"
#include "tarray.h"
#include "tdef.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t azBegin();
void azEnd();
int32_t azCheckCfg();
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size);
int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock);
void azDeleteObjectsByPrefix(const char *prefix);
int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp);
int32_t azGetObjectsByPrefix(const char *prefix, const char *path);
int32_t azGetObjectToFile(const char *object_name, const char *fileName);
int32_t azDeleteObjects(const char *object_name[], int nobject);
#ifdef __cplusplus
}
#endif
#endif // _TD_AZURE_H_

View File

@ -23,5 +23,5 @@ add_subdirectory(planner)
add_subdirectory(qworker)
add_subdirectory(geometry)
add_subdirectory(command)
#add_subdirectory(azure)
add_subdirectory(azure)
add_subdirectory(tcs)

View File

@ -0,0 +1,32 @@
#if(${TD_LINUX})
aux_source_directory(src AZ_SRC)
add_library(az STATIC ${AZ_SRC})
if(${BUILD_S3})
add_definitions(-DUSE_S3)
target_link_libraries(
az
PUBLIC _azure_sdk
PUBLIC crypt
)
endif()
target_include_directories(
az
PUBLIC "${TD_SOURCE_DIR}/include/libs/azure"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
az
PUBLIC cjson
PUBLIC os
PUBLIC util
PUBLIC common
)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
#endif(${TD_LINUX})

View File

@ -0,0 +1,260 @@
#pragma once
#include "azure/storage/blobs/blob_client.hpp"
#include <cstdint>
#include <string>
#include <vector>
namespace Azure {
namespace Storage {
namespace Files {
namespace DataLake {
class FileClient;
}
} // namespace Files
} // namespace Storage
} // namespace Azure
namespace Azure {
namespace Storage {
namespace Blobs {
/**
* @brief The TDBlockBlobClient allows you to manipulate Azure Storage block blobs.
*
* Block blobs let you upload large blobs efficiently. Block blobs are comprised of blocks, each
* of which is identified by a block ID. You create or modify a block blob by writing a set of
* blocks and committing them by their block IDs. Each block can be a different size.
*
* When you upload a block to a blob in your storage account, it is associated with the specified
* block blob, but it does not become part of the blob until you commit a list of blocks that
* includes the new block's ID. New blocks remain in an uncommitted state until they are
* specifically committed or discarded. Writing a block does not update the last modified time of
* an existing blob.
*/
class TDBlockBlobClient final : public BlobClient {
public:
/**
* @brief Initialize a new instance of TDBlockBlobClient.
*
* @param connectionString A connection string includes the authentication information required
* for your application to access data in an Azure Storage account at runtime.
* @param blobContainerName The name of the container containing this blob.
* @param blobName The name of this blob.
* @param options Optional client options that define the transport pipeline policies for
* authentication, retries, etc., that are applied to every request.
* @return A new TDBlockBlobClient instance.
*/
static TDBlockBlobClient CreateFromConnectionString(const std::string& connectionString,
const std::string& blobContainerName, const std::string& blobName,
const BlobClientOptions& options = BlobClientOptions());
/**
* @brief Initialize a new instance of TDBlockBlobClient.
*
* @param blobUrl A URL
* referencing the blob that includes the name of the account, the name of the container, and
* the name of the blob.
* @param credential The shared key credential used to sign
* requests.
* @param options Optional client options that define the transport pipeline
* policies for authentication, retries, etc., that are applied to every request.
*/
explicit TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr<StorageSharedKeyCredential> credential,
const BlobClientOptions& options = BlobClientOptions());
/**
* @brief Initialize a new instance of TDBlockBlobClient.
*
* @param blobUrl A URL
* referencing the blob that includes the name of the account, the name of the container, and
* the name of the blob.
* @param credential The token credential used to sign requests.
* @param options Optional client options that define the transport pipeline policies for
* authentication, retries, etc., that are applied to every request.
*/
explicit TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr<Core::Credentials::TokenCredential> credential,
const BlobClientOptions& options = BlobClientOptions());
/**
* @brief Initialize a new instance of TDBlockBlobClient.
*
* @param blobUrl A URL
* referencing the blob that includes the name of the account, the name of the container, and
* the name of the blob, and possibly also a SAS token.
* @param options Optional client
* options that define the transport pipeline policies for authentication, retries, etc., that
* are applied to every request.
*/
explicit TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options = BlobClientOptions());
/**
* @brief Initializes a new instance of the TDBlockBlobClient class with an identical URL
* source but the specified snapshot timestamp.
*
* @param snapshot The snapshot
* identifier.
* @return A new TDBlockBlobClient instance.
* @remarks Pass empty string to remove the snapshot returning the base blob.
*/
TDBlockBlobClient WithSnapshot(const std::string& snapshot) const;
/**
* @brief Creates a clone of this instance that references a version ID rather than the base
* blob.
*
* @param versionId The version ID returning a URL to the base blob.
* @return A new TDBlockBlobClient instance.
* @remarks Pass empty string to remove the version ID returning the base blob.
*/
TDBlockBlobClient WithVersionId(const std::string& versionId) const;
/**
* @brief Creates a new block blob, or updates the content of an existing block blob. Updating
* an existing block blob overwrites any existing metadata on the blob.
*
* @param content A BodyStream containing the content to upload.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A UploadBlockBlobResult describing the state of the updated block blob.
*/
Azure::Response<Models::UploadBlockBlobResult> Upload(
Azure::Core::IO::BodyStream& content, const UploadBlockBlobOptions& options = UploadBlockBlobOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Creates a new block blob, or updates the content of an existing block blob. Updating
* an existing block blob overwrites any existing metadata on the blob.
*
* @param buffer A memory buffer containing the content to upload.
* @param bufferSize Size of the memory buffer.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A UploadBlockBlobFromResult describing the state of the updated block blob.
*/
Azure::Response<Models::UploadBlockBlobFromResult> UploadFrom(
const uint8_t* buffer, size_t bufferSize,
const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Creates a new block blob, or updates the content of an existing block blob. Updating
* an existing block blob overwrites any existing metadata on the blob.
*
* @param fileName A file containing the content to upload.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A UploadBlockBlobFromResult describing the state of the updated block blob.
*/
Azure::Response<Models::UploadBlockBlobFromResult> UploadFrom(
const std::string& fileName, const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
Azure::Response<Models::UploadBlockBlobFromResult> UploadFrom(
const std::string& fileName, int64_t offset, int64_t size,
const UploadBlockBlobFromOptions& options = UploadBlockBlobFromOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Creates a new Block Blob where the contents of the blob are read from a given URL.
*
* @param sourceUri Specifies the URL of the source blob.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A UploadBlockBlobFromUriResult describing the state of the updated block blob.
*/
Azure::Response<Models::UploadBlockBlobFromUriResult> UploadFromUri(
const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options = UploadBlockBlobFromUriOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Creates a new block as part of a block blob's staging area to be eventually
* committed via the CommitBlockList operation.
*
* @param blockId A valid Base64 string value that identifies the block. Prior to encoding, the
* string must be less than or equal to 64 bytes in size.
* @param content A BodyStream containing the content to upload.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A StageBlockResult describing the state of the updated block.
*/
Azure::Response<Models::StageBlockResult> StageBlock(
const std::string& blockId, Azure::Core::IO::BodyStream& content,
const StageBlockOptions& options = StageBlockOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Creates a new block to be committed as part of a blob where the contents are read from
* the sourceUri.
*
* @param blockId A valid Base64 string value that identifies the block. Prior to encoding, the
* string must be less than or equal to 64 bytes in size.
* @param sourceUri Specifies the uri of the source
* blob. The value may be a uri of up to 2 KB in length that specifies a blob. The source blob
* must either be public or must be authenticated via a shared access signature. If the source
* blob is public, no authentication is required to perform the operation.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A StageBlockFromUriResult describing the state of the updated block blob.
*/
Azure::Response<Models::StageBlockFromUriResult> StageBlockFromUri(
const std::string& blockId, const std::string& sourceUri,
const StageBlockFromUriOptions& options = StageBlockFromUriOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Writes a blob by specifying the list of block IDs that make up the blob. In order to
* be written as part of a blob, a block must have been successfully written to the server in a
* prior StageBlock operation. You can call CommitBlockList to update a blob by uploading only
* those blocks that have changed, then committing the new and existing blocks together. You can
* do this by specifying whether to commit a block from the committed block list or from the
* uncommitted block list, or to commit the most recently uploaded version of the block,
* whichever list it may belong to.
*
* @param blockIds Base64 encoded block IDs to indicate that make up the blob.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A CommitBlobBlockListResult describing the state of the updated block blob.
*/
Azure::Response<Models::CommitBlockListResult> CommitBlockList(
const std::vector<std::string>& blockIds, const CommitBlockListOptions& options = CommitBlockListOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Retrieves the list of blocks that have been uploaded as part of a block blob. There
* are two block lists maintained for a blob. The Committed Block list has blocks that have been
* successfully committed to a given blob with CommitBlockList. The Uncommitted Block list has
* blocks that have been uploaded for a blob using StageBlock, but that have not yet been
* committed.
*
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A GetBlobBlockListResult describing requested block list.
*/
Azure::Response<Models::GetBlockListResult> GetBlockList(
const GetBlockListOptions& options = GetBlockListOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
/**
* @brief Returns the result of a query against the blob.
*
* @param querySqlExpression The query expression in SQL.
* @param options Optional parameters to execute this function.
* @param context Context for cancelling long running operations.
* @return A QueryBlobResult describing the query result.
*/
Azure::Response<Models::QueryBlobResult> Query(const std::string& querySqlExpression,
const QueryBlobOptions& options = QueryBlobOptions(),
const Azure::Core::Context& context = Azure::Core::Context()) const;
explicit TDBlockBlobClient(BlobClient blobClient);
private:
friend class BlobClient;
friend class Files::DataLake::DataLakeFileClient;
};
} // namespace Blobs
} // namespace Storage
} // namespace Azure

View File

@ -0,0 +1,531 @@
#if defined(USE_S3)
#include "avro_parser.hpp"
#include <azure/core/azure_assert.hpp>
#include <azure/core/internal/json/json.hpp>
#include <algorithm>
#include <cstring>
namespace Azure {
namespace Storage {
namespace Blobs {
namespace _detail {
namespace {
int64_t parseInt(AvroStreamReader::ReaderPos& data) {
uint64_t r = 0;
int nb = 0;
while (true) {
uint8_t c = (*data.BufferPtr)[data.Offset++];
r = r | ((static_cast<uint64_t>(c) & 0x7f) << (nb * 7));
if (c & 0x80) {
++nb;
continue;
}
break;
}
return static_cast<int64_t>(r >> 1) ^ -static_cast<int64_t>(r & 0x01);
}
AvroSchema ParseSchemaFromJsonString(const std::string& jsonSchema) {
const static std::map<std::string, AvroSchema> BuiltinNameSchemaMap = {
{"string", AvroSchema::StringSchema}, {"bytes", AvroSchema::BytesSchema}, {"int", AvroSchema::IntSchema},
{"long", AvroSchema::LongSchema}, {"float", AvroSchema::FloatSchema}, {"double", AvroSchema::DoubleSchema},
{"boolean", AvroSchema::BoolSchema}, {"null", AvroSchema::NullSchema}, {"string", AvroSchema::StringSchema},
};
std::map<std::string, AvroSchema> nameSchemaMap = BuiltinNameSchemaMap;
std::function<AvroSchema(const Core::Json::_internal::json& obj)> parseSchemaFromJsonObject;
parseSchemaFromJsonObject = [&](const Core::Json::_internal::json& obj) -> AvroSchema {
if (obj.is_string()) {
auto typeName = obj.get<std::string>();
return nameSchemaMap.find(typeName)->second;
} else if (obj.is_array()) {
std::vector<AvroSchema> unionSchemas;
for (const auto& s : obj) {
unionSchemas.push_back(parseSchemaFromJsonObject(s));
}
return AvroSchema::UnionSchema(std::move(unionSchemas));
} else if (obj.is_object()) {
if (obj.count("namespace") != 0) {
throw std::runtime_error("Namespace isn't supported yet in Avro schema.");
}
if (obj.count("aliases") != 0) {
throw std::runtime_error("Alias isn't supported yet in Avro schema.");
}
auto typeName = obj["type"].get<std::string>();
auto i = nameSchemaMap.find(typeName);
if (i != nameSchemaMap.end()) {
return i->second;
}
if (typeName == "record") {
std::vector<std::pair<std::string, AvroSchema>> fieldsSchema;
for (const auto& field : obj["fields"]) {
fieldsSchema.push_back(
std::make_pair(field["name"].get<std::string>(), parseSchemaFromJsonObject(field["type"])));
}
const std::string recordName = obj["name"].get<std::string>();
auto recordSchema = AvroSchema::RecordSchema(recordName, std::move(fieldsSchema));
nameSchemaMap.insert(std::make_pair(recordName, recordSchema));
return recordSchema;
} else if (typeName == "enum") {
throw std::runtime_error("Enum type isn't supported yet in Avro schema.");
} else if (typeName == "array") {
return AvroSchema::ArraySchema(parseSchemaFromJsonObject(obj["items"]));
} else if (typeName == "map") {
return AvroSchema::MapSchema(parseSchemaFromJsonObject(obj["items"]));
} else if (typeName == "fixed") {
const std::string fixedName = obj["name"].get<std::string>();
auto fixedSchema = AvroSchema::FixedSchema(fixedName, obj["size"].get<int64_t>());
nameSchemaMap.insert(std::make_pair(fixedName, fixedSchema));
return fixedSchema;
} else {
throw std::runtime_error("Unrecognized type " + typeName + " in Avro schema.");
}
}
AZURE_UNREACHABLE_CODE();
};
auto jsonRoot = Core::Json::_internal::json::parse(jsonSchema.begin(), jsonSchema.end());
return parseSchemaFromJsonObject(jsonRoot);
}
} // namespace
int64_t AvroStreamReader::ParseInt(const Core::Context& context) {
uint64_t r = 0;
int nb = 0;
while (true) {
Preload(1, context);
uint8_t c = m_streambuffer[m_pos.Offset++];
r = r | ((static_cast<uint64_t>(c) & 0x7f) << (nb * 7));
if (c & 0x80) {
++nb;
continue;
}
break;
}
return static_cast<int64_t>(r >> 1) ^ -static_cast<int64_t>(r & 0x01);
}
void AvroStreamReader::Advance(size_t n, const Core::Context& context) {
Preload(n, context);
m_pos.Offset += n;
}
size_t AvroStreamReader::Preload(size_t n, const Core::Context& context) {
size_t oldAvailable = AvailableBytes();
while (true) {
size_t newAvailable = TryPreload(n, context);
if (newAvailable >= n) {
return newAvailable;
}
if (oldAvailable == newAvailable) {
throw std::runtime_error("Unexpected EOF of Avro stream.");
}
oldAvailable = newAvailable;
}
AZURE_UNREACHABLE_CODE();
}
size_t AvroStreamReader::TryPreload(size_t n, const Core::Context& context) {
size_t availableBytes = AvailableBytes();
if (availableBytes >= n) {
return availableBytes;
}
const size_t MinRead = 4096;
size_t tryReadSize = (std::max)(n, MinRead);
size_t currSize = m_streambuffer.size();
m_streambuffer.resize(m_streambuffer.size() + tryReadSize);
size_t actualReadSize = m_stream->Read(m_streambuffer.data() + currSize, tryReadSize, context);
m_streambuffer.resize(currSize + actualReadSize);
return AvailableBytes();
}
void AvroStreamReader::Discard() {
constexpr size_t MinimumReleaseMemory = 128 * 1024;
if (m_pos.Offset < MinimumReleaseMemory) {
return;
}
const size_t availableBytes = AvailableBytes();
std::memmove(&m_streambuffer[0], &m_streambuffer[m_pos.Offset], availableBytes);
m_streambuffer.resize(availableBytes);
m_pos.Offset = 0;
}
const AvroSchema AvroSchema::StringSchema(AvroDatumType::String);
const AvroSchema AvroSchema::BytesSchema(AvroDatumType::Bytes);
const AvroSchema AvroSchema::IntSchema(AvroDatumType::Int);
const AvroSchema AvroSchema::LongSchema(AvroDatumType::Long);
const AvroSchema AvroSchema::FloatSchema(AvroDatumType::Float);
const AvroSchema AvroSchema::DoubleSchema(AvroDatumType::Double);
const AvroSchema AvroSchema::BoolSchema(AvroDatumType::Bool);
const AvroSchema AvroSchema::NullSchema(AvroDatumType::Null);
AvroSchema AvroSchema::RecordSchema(std::string name,
const std::vector<std::pair<std::string, AvroSchema>>& fieldsSchema) {
AvroSchema recordSchema(AvroDatumType::Record);
recordSchema.m_name = std::move(name);
recordSchema.m_status = std::make_shared<SharedStatus>();
for (auto& i : fieldsSchema) {
recordSchema.m_status->m_keys.push_back(i.first);
recordSchema.m_status->m_schemas.push_back(i.second);
}
return recordSchema;
}
AvroSchema AvroSchema::ArraySchema(AvroSchema elementSchema) {
AvroSchema arraySchema(AvroDatumType::Array);
arraySchema.m_status = std::make_shared<SharedStatus>();
arraySchema.m_status->m_schemas.push_back(std::move(elementSchema));
return arraySchema;
}
AvroSchema AvroSchema::MapSchema(AvroSchema elementSchema) {
AvroSchema mapSchema(AvroDatumType::Map);
mapSchema.m_status = std::make_shared<SharedStatus>();
mapSchema.m_status->m_schemas.push_back(std::move(elementSchema));
return mapSchema;
}
AvroSchema AvroSchema::UnionSchema(std::vector<AvroSchema> schemas) {
AvroSchema unionSchema(AvroDatumType::Union);
unionSchema.m_status = std::make_shared<SharedStatus>();
unionSchema.m_status->m_schemas = std::move(schemas);
return unionSchema;
}
AvroSchema AvroSchema::FixedSchema(std::string name, int64_t size) {
AvroSchema fixedSchema(AvroDatumType::Fixed);
fixedSchema.m_name = std::move(name);
fixedSchema.m_status = std::make_shared<SharedStatus>();
fixedSchema.m_status->m_size = size;
return fixedSchema;
}
void AvroDatum::Fill(AvroStreamReader& reader, const Core::Context& context) {
m_data = reader.m_pos;
if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) {
int64_t stringSize = reader.ParseInt(context);
reader.Advance(static_cast<size_t>(stringSize), context);
} else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long ||
m_schema.Type() == AvroDatumType::Enum) {
reader.ParseInt(context);
} else if (m_schema.Type() == AvroDatumType::Float) {
reader.Advance(4, context);
} else if (m_schema.Type() == AvroDatumType::Double) {
reader.Advance(8, context);
} else if (m_schema.Type() == AvroDatumType::Bool) {
reader.Advance(1, context);
} else if (m_schema.Type() == AvroDatumType::Null) {
reader.Advance(0, context);
} else if (m_schema.Type() == AvroDatumType::Record) {
for (const auto& s : m_schema.FieldSchemas()) {
AvroDatum(s).Fill(reader, context);
}
} else if (m_schema.Type() == AvroDatumType::Array) {
while (true) {
int64_t numElementsInBlock = reader.ParseInt(context);
if (numElementsInBlock == 0) {
break;
} else if (numElementsInBlock < 0) {
int64_t blockSize = reader.ParseInt(context);
reader.Advance(static_cast<size_t>(blockSize), context);
} else {
for (auto i = 0; i < numElementsInBlock; ++i) {
AvroDatum(m_schema.ItemSchema()).Fill(reader, context);
}
}
}
} else if (m_schema.Type() == AvroDatumType::Map) {
while (true) {
int64_t numElementsInBlock = reader.ParseInt(context);
if (numElementsInBlock == 0) {
break;
} else if (numElementsInBlock < 0) {
int64_t blockSize = reader.ParseInt(context);
reader.Advance(static_cast<size_t>(blockSize), context);
} else {
for (int64_t i = 0; i < numElementsInBlock; ++i) {
AvroDatum(AvroSchema::StringSchema).Fill(reader, context);
AvroDatum(m_schema.ItemSchema()).Fill(reader, context);
}
}
}
} else if (m_schema.Type() == AvroDatumType::Union) {
int64_t i = reader.ParseInt(context);
AvroDatum(m_schema.FieldSchemas()[static_cast<size_t>(i)]).Fill(reader, context);
} else if (m_schema.Type() == AvroDatumType::Fixed) {
reader.Advance(m_schema.Size(), context);
} else {
AZURE_UNREACHABLE_CODE();
}
}
void AvroDatum::Fill(AvroStreamReader::ReaderPos& data) {
m_data = data;
if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) {
int64_t stringSize = parseInt(data);
data.Offset += static_cast<size_t>(stringSize);
} else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long ||
m_schema.Type() == AvroDatumType::Enum) {
parseInt(data);
} else if (m_schema.Type() == AvroDatumType::Float) {
data.Offset += 4;
} else if (m_schema.Type() == AvroDatumType::Double) {
data.Offset += 8;
} else if (m_schema.Type() == AvroDatumType::Bool) {
data.Offset += 1;
} else if (m_schema.Type() == AvroDatumType::Null) {
data.Offset += 0;
} else if (m_schema.Type() == AvroDatumType::Record) {
for (const auto& s : m_schema.FieldSchemas()) {
AvroDatum(s).Fill(data);
}
} else if (m_schema.Type() == AvroDatumType::Array) {
while (true) {
int64_t numElementsInBlock = parseInt(data);
if (numElementsInBlock == 0) {
break;
} else if (numElementsInBlock < 0) {
int64_t blockSize = parseInt(data);
data.Offset += static_cast<size_t>(blockSize);
} else {
for (auto i = 0; i < numElementsInBlock; ++i) {
AvroDatum(m_schema.ItemSchema()).Fill(data);
}
}
}
} else if (m_schema.Type() == AvroDatumType::Map) {
while (true) {
int64_t numElementsInBlock = parseInt(data);
if (numElementsInBlock == 0) {
break;
} else if (numElementsInBlock < 0) {
int64_t blockSize = parseInt(data);
data.Offset += static_cast<size_t>(blockSize);
} else {
for (int64_t i = 0; i < numElementsInBlock; ++i) {
AvroDatum(AvroSchema::StringSchema).Fill(data);
AvroDatum(m_schema.ItemSchema()).Fill(data);
}
}
}
} else if (m_schema.Type() == AvroDatumType::Union) {
int64_t i = parseInt(data);
AvroDatum(m_schema.FieldSchemas()[static_cast<size_t>(i)]).Fill(data);
} else if (m_schema.Type() == AvroDatumType::Fixed) {
data.Offset += m_schema.Size();
} else {
AZURE_UNREACHABLE_CODE();
}
}
template <>
AvroDatum::StringView AvroDatum::Value() const {
auto data = m_data;
if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) {
const int64_t length = parseInt(data);
const uint8_t* start = &(*data.BufferPtr)[data.Offset];
StringView ret{start, static_cast<size_t>(length)};
data.Offset += static_cast<size_t>(length);
return ret;
}
if (m_schema.Type() == AvroDatumType::Fixed) {
const size_t fixedSize = m_schema.Size();
const uint8_t* start = &(*data.BufferPtr)[data.Offset];
StringView ret{start, fixedSize};
data.Offset += fixedSize;
return ret;
}
AZURE_UNREACHABLE_CODE();
}
template <>
std::string AvroDatum::Value() const {
auto stringView = Value<StringView>();
return std::string(stringView.Data, stringView.Data + stringView.Length);
}
template <>
std::vector<uint8_t> AvroDatum::Value() const {
auto stringView = Value<StringView>();
return std::vector<uint8_t>(stringView.Data, stringView.Data + stringView.Length);
}
template <>
int64_t AvroDatum::Value() const {
auto data = m_data;
return parseInt(data);
}
template <>
int32_t AvroDatum::Value() const {
return static_cast<int32_t>(Value<int64_t>());
}
template <>
bool AvroDatum::Value() const {
return Value<int64_t>();
}
template <>
std::nullptr_t AvroDatum::Value() const {
return nullptr;
}
template <>
AvroRecord AvroDatum::Value() const {
auto data = m_data;
AvroRecord r;
r.m_keys = &m_schema.FieldNames();
for (const auto& schema : m_schema.FieldSchemas()) {
auto datum = AvroDatum(schema);
datum.Fill(data);
r.m_values.push_back(std::move(datum));
}
return r;
}
template <>
AvroMap AvroDatum::Value() const {
auto data = m_data;
AvroMap m;
while (true) {
int64_t numElementsInBlock = parseInt(data);
if (numElementsInBlock == 0) {
break;
}
if (numElementsInBlock < 0) {
numElementsInBlock = -numElementsInBlock;
parseInt(data);
}
for (int64_t i = 0; i < numElementsInBlock; ++i) {
auto keyDatum = AvroDatum(AvroSchema::StringSchema);
keyDatum.Fill(data);
auto valueDatum = AvroDatum(m_schema.ItemSchema());
valueDatum.Fill(data);
m[keyDatum.Value<std::string>()] = valueDatum;
}
}
return m;
}
template <>
AvroDatum AvroDatum::Value() const {
auto data = m_data;
if (m_schema.Type() == AvroDatumType::Union) {
int64_t i = parseInt(data);
auto datum = AvroDatum(m_schema.FieldSchemas()[static_cast<size_t>(i)]);
datum.Fill(data);
return datum;
}
AZURE_UNREACHABLE_CODE();
}
AvroObjectContainerReader::AvroObjectContainerReader(Core::IO::BodyStream& stream)
: m_reader(std::make_unique<AvroStreamReader>(stream)) {}
AvroDatum AvroObjectContainerReader::NextImpl(const AvroSchema* schema, const Core::Context& context) {
AZURE_ASSERT_FALSE(m_eof);
static const auto SyncMarkerSchema = AvroSchema::FixedSchema("Sync", 16);
if (!schema) {
static AvroSchema FileHeaderSchema = []() {
std::vector<std::pair<std::string, AvroSchema>> fieldsSchema;
fieldsSchema.push_back(std::make_pair("magic", AvroSchema::FixedSchema("Magic", 4)));
fieldsSchema.push_back(std::make_pair("meta", AvroSchema::MapSchema(AvroSchema::BytesSchema)));
fieldsSchema.push_back(std::make_pair("sync", SyncMarkerSchema));
return AvroSchema::RecordSchema("org.apache.avro.file.Header", std::move(fieldsSchema));
}();
auto fileHeaderDatum = AvroDatum(FileHeaderSchema);
fileHeaderDatum.Fill(*m_reader, context);
auto fileHeader = fileHeaderDatum.Value<AvroRecord>();
if (fileHeader.Field("magic").Value<std::string>() != "Obj\01") {
throw std::runtime_error("Invalid Avro object container magic.");
}
AvroMap meta = fileHeader.Field("meta").Value<AvroMap>();
std::string objectSchemaJson = meta["avro.schema"].Value<std::string>();
std::string codec = "null";
if (meta.count("avro.codec") != 0) {
codec = meta["avro.codec"].Value<std::string>();
}
if (codec != "null") {
throw std::runtime_error("Unsupported Avro codec: " + codec);
}
m_syncMarker = fileHeader.Field("sync").Value<std::string>();
m_objectSchema = std::make_unique<AvroSchema>(ParseSchemaFromJsonString(objectSchemaJson));
schema = m_objectSchema.get();
}
if (m_remainingObjectInCurrentBlock == 0) {
m_reader->Discard();
m_remainingObjectInCurrentBlock = m_reader->ParseInt(context);
int64_t ObjectsSize = m_reader->ParseInt(context);
m_reader->Preload(static_cast<size_t>(ObjectsSize), context);
}
auto objectDatum = AvroDatum(*m_objectSchema);
objectDatum.Fill(*m_reader, context);
if (--m_remainingObjectInCurrentBlock == 0) {
auto markerDatum = AvroDatum(SyncMarkerSchema);
markerDatum.Fill(*m_reader, context);
auto marker = markerDatum.Value<std::string>();
if (marker != m_syncMarker) {
throw std::runtime_error("Sync marker doesn't match.");
}
m_eof = m_reader->TryPreload(1, context) == 0;
}
return objectDatum;
}
size_t AvroStreamParser::OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) {
if (m_parserBuffer.Length != 0) {
size_t bytesToCopy = (std::min)(m_parserBuffer.Length, count);
std::memcpy(buffer, m_parserBuffer.Data, bytesToCopy);
m_parserBuffer.Data += bytesToCopy;
m_parserBuffer.Length -= bytesToCopy;
return bytesToCopy;
}
while (!m_parser.End()) {
auto datum = m_parser.Next(context);
if (datum.Schema().Type() == AvroDatumType::Union) {
datum = datum.Value<AvroDatum>();
}
if (datum.Schema().Type() != AvroDatumType::Record) {
continue;
}
if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.resultData") {
auto record = datum.Value<AvroRecord>();
auto dataDatum = record.Field("data");
m_parserBuffer = dataDatum.Value<AvroDatum::StringView>();
return OnRead(buffer, count, context);
}
if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.progress" && m_progressCallback) {
auto record = datum.Value<AvroRecord>();
auto bytesScanned = record.Field("bytesScanned").Value<int64_t>();
auto totalBytes = record.Field("totalBytes").Value<int64_t>();
m_progressCallback(bytesScanned, totalBytes);
}
if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.error" && m_errorCallback) {
auto record = datum.Value<AvroRecord>();
BlobQueryError e;
e.Name = record.Field("name").Value<std::string>();
e.Description = record.Field("description").Value<std::string>();
e.IsFatal = record.Field("fatal").Value<bool>();
e.Position = record.Field("position").Value<int64_t>();
m_errorCallback(std::move(e));
}
}
return 0;
}
} // namespace _detail
} // namespace Blobs
} // namespace Storage
} // namespace Azure
#endif

View File

@ -0,0 +1,198 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include "azure/storage/blobs/blob_options.hpp"
#include <azure/core/io/body_stream.hpp>
#include <map>
#include <memory>
#include <type_traits>
namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
enum class AvroDatumType
{
String,
Bytes,
Int,
Long,
Float,
Double,
Bool,
Null,
Record,
Enum,
Array,
Map,
Union,
Fixed,
};
class AvroStreamReader final {
public:
// position of a vector that lives through vector resizing
struct ReaderPos final
{
const std::vector<uint8_t>* BufferPtr = nullptr;
size_t Offset = 0;
};
explicit AvroStreamReader(Core::IO::BodyStream& stream)
: m_stream(&stream), m_pos{&m_streambuffer, 0}
{
}
AvroStreamReader(const AvroStreamReader&) = delete;
AvroStreamReader& operator=(const AvroStreamReader&) = delete;
int64_t ParseInt(const Core::Context& context);
void Advance(size_t n, const Core::Context& context);
// Read at least n bytes from m_stream and append data to m_streambuffer. Return number of bytes
// available in m_streambuffer;
size_t Preload(size_t n, const Core::Context& context);
size_t TryPreload(size_t n, const Core::Context& context);
// discards data that's before m_pos
void Discard();
private:
size_t AvailableBytes() const { return m_streambuffer.size() - m_pos.Offset; }
private:
Core::IO::BodyStream* m_stream;
std::vector<uint8_t> m_streambuffer;
ReaderPos m_pos;
friend class AvroDatum;
};
class AvroSchema final {
public:
static const AvroSchema StringSchema;
static const AvroSchema BytesSchema;
static const AvroSchema IntSchema;
static const AvroSchema LongSchema;
static const AvroSchema FloatSchema;
static const AvroSchema DoubleSchema;
static const AvroSchema BoolSchema;
static const AvroSchema NullSchema;
static AvroSchema RecordSchema(
std::string name,
const std::vector<std::pair<std::string, AvroSchema>>& fieldsSchema);
static AvroSchema ArraySchema(AvroSchema elementSchema);
static AvroSchema MapSchema(AvroSchema elementSchema);
static AvroSchema UnionSchema(std::vector<AvroSchema> schemas);
static AvroSchema FixedSchema(std::string name, int64_t size);
const std::string& Name() const { return m_name; }
AvroDatumType Type() const { return m_type; }
const std::vector<std::string>& FieldNames() const { return m_status->m_keys; }
AvroSchema ItemSchema() const { return m_status->m_schemas[0]; }
const std::vector<AvroSchema>& FieldSchemas() const { return m_status->m_schemas; }
size_t Size() const { return static_cast<size_t>(m_status->m_size); }
private:
explicit AvroSchema(AvroDatumType type) : m_type(type) {}
private:
AvroDatumType m_type;
std::string m_name;
struct SharedStatus
{
std::vector<std::string> m_keys;
std::vector<AvroSchema> m_schemas;
int64_t m_size = 0;
};
std::shared_ptr<SharedStatus> m_status;
};
class AvroDatum final {
public:
AvroDatum() : m_schema(AvroSchema::NullSchema) {}
explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {}
void Fill(AvroStreamReader& reader, const Core::Context& context);
void Fill(AvroStreamReader::ReaderPos& data);
const AvroSchema& Schema() const { return m_schema; }
template <class T> T Value() const;
struct StringView
{
const uint8_t* Data = nullptr;
size_t Length = 0;
};
private:
AvroSchema m_schema;
AvroStreamReader::ReaderPos m_data;
};
using AvroMap = std::map<std::string, AvroDatum>;
class AvroRecord final {
public:
bool HasField(const std::string& key) const { return FindField(key) != m_keys->size(); }
const AvroDatum& Field(const std::string& key) const { return m_values.at(FindField(key)); }
AvroDatum& Field(const std::string& key) { return m_values.at(FindField(key)); }
const AvroDatum& FieldAt(size_t i) const { return m_values.at(i); }
AvroDatum& FieldAt(size_t i) { return m_values.at(i); }
private:
size_t FindField(const std::string& key) const
{
auto i = find(m_keys->begin(), m_keys->end(), key);
return i - m_keys->begin();
}
const std::vector<std::string>* m_keys = nullptr;
std::vector<AvroDatum> m_values;
friend class AvroDatum;
};
class AvroObjectContainerReader final {
public:
explicit AvroObjectContainerReader(Core::IO::BodyStream& stream);
bool End() const { return m_eof; }
// Calling Next() will invalidates the previous AvroDatum returned by this function and all
// AvroDatums propagated from there.
AvroDatum Next(const Core::Context& context) { return NextImpl(m_objectSchema.get(), context); }
private:
AvroDatum NextImpl(const AvroSchema* schema, const Core::Context& context);
private:
std::unique_ptr<AvroStreamReader> m_reader;
std::unique_ptr<AvroSchema> m_objectSchema;
std::string m_syncMarker;
int64_t m_remainingObjectInCurrentBlock = 0;
bool m_eof = false;
};
class AvroStreamParser final : public Core::IO::BodyStream {
public:
explicit AvroStreamParser(
std::unique_ptr<Azure::Core::IO::BodyStream> inner,
std::function<void(int64_t, int64_t)> progressCallback,
std::function<void(BlobQueryError)> errorCallback)
: m_inner(std::move(inner)), m_parser(*m_inner),
m_progressCallback(std::move(progressCallback)), m_errorCallback(std::move(errorCallback))
{
}
int64_t Length() const override { return -1; }
void Rewind() override { this->m_inner->Rewind(); }
private:
size_t OnRead(uint8_t* buffer, size_t count, const Azure::Core::Context& context) override;
private:
std::unique_ptr<Azure::Core::IO::BodyStream> m_inner;
AvroObjectContainerReader m_parser;
std::function<void(int64_t, int64_t)> m_progressCallback;
std::function<void(BlobQueryError)> m_errorCallback;
AvroDatum::StringView m_parserBuffer;
};
}}}} // namespace Azure::Storage::Blobs::_detail

View File

@ -0,0 +1,402 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "az.h"
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#if defined(USE_S3)
#include <azure/core.hpp>
#include <azure/storage/blobs.hpp>
#include "td_block_blob_client.hpp"
// Add appropriate using namespace directives
using namespace Azure::Storage;
using namespace Azure::Storage::Blobs;
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
extern char tsS3BucketName[TSDB_FQDN_LEN];
extern int8_t tsS3Enabled;
extern int8_t tsS3EpNum;
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
void azEnd() {}
static void azDumpCfgByEp(int8_t epIndex) {
// clang-format off
(void)fprintf(stdout,
"%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n"
// "%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n",
"hostName", tsS3Hostname[epIndex],
"bucketName", tsS3BucketName,
"protocol", "https only",
//"uristyle", (uriStyleG[epIndex] == S3UriStyleVirtualHost ? "virtualhost" : "path"),
"accessKey", tsS3AccessKeyId[epIndex],
"accessKeySecret", tsS3AccessKeySecret[epIndex]);
// clang-format on
}
static int32_t azListBucket(char const *bucketname) {
int32_t code = 0;
const std::string delimiter = "/";
std::string accountName = tsS3AccessKeyId[0];
std::string accountKey = tsS3AccessKeySecret[0];
std::string accountURL = tsS3Hostname[0];
accountURL = "https://" + accountURL;
try {
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
std::string containerName = bucketname;
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = "s3";
(void)fprintf(stderr, "objects:\n");
// std::set<std::string> listBlobs;
for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) {
for (const auto &blob : pageResult.Blobs) {
(void)fprintf(stderr, "%s\n", blob.Name.c_str());
}
}
} catch (const Azure::Core::RequestFailedException &e) {
uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast<int>(e.StatusCode),
e.ReasonPhrase.c_str());
// uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TAOS_SYSTEM_ERROR(EIO)));
code = TAOS_SYSTEM_ERROR(EIO);
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
int32_t azCheckCfg() {
int32_t code = 0, lino = 0;
int8_t i = 0;
// for (; i < tsS3EpNum; i++) {
(void)fprintf(stdout, "test s3 ep (%d/%d):\n", i + 1, tsS3EpNum);
// s3DumpCfgByEp(i);
azDumpCfgByEp(0);
// test put
char testdata[17] = "0123456789abcdef";
const char *objectname[] = {"s3test.txt"};
char path[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]);
}
uint8_t *pBlock = NULL;
int c_offset = 10;
int c_len = 6;
char buf[7] = {0};
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (!fp) {
(void)fprintf(stderr, "failed to open test file: %s.\n", path);
// uError("ERROR: %s Failed to open %s", __func__, path);
TAOS_CHECK_GOTO(terrno, &lino, _next);
}
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
(void)fprintf(stderr, "failed to write test file: %s.\n", path);
TAOS_CHECK_GOTO(terrno, &lino, _next);
}
if (taosFsyncFile(fp) < 0) {
(void)fprintf(stderr, "failed to fsync test file: %s.\n", path);
TAOS_CHECK_GOTO(terrno, &lino, _next);
}
(void)taosCloseFile(&fp);
(void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata);
code = azPutObjectFromFileOffset(path, objectname[0], 0, 16);
if (code != 0) {
(void)fprintf(stderr, "put object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _next);
}
(void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
// list buckets
(void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName);
// code = s3ListBucketByEp(tsS3BucketName, i);
code = azListBucket(tsS3BucketName);
if (code != 0) {
(void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
TAOS_CHECK_GOTO(code, &lino, _next);
}
(void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
// test range get
(void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len);
code = azGetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock);
if (code != 0) {
(void)fprintf(stderr, "get object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _next);
}
(void)memcpy(buf, pBlock, c_len);
taosMemoryFree(pBlock);
(void)fprintf(stderr, "object content: %s\n", buf);
(void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]);
// delete test object
(void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]);
// code = azDeleteObjectsByPrefix(objectname[0]);
azDeleteObjectsByPrefix(objectname[0]);
/*
if (code != 0) {
(void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _next);
}
*/
(void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
_next:
if (fp) {
(void)taosCloseFile(&fp);
}
if (TSDB_CODE_SUCCESS != code) {
(void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i);
}
(void)fprintf(stdout, "=================================================================\n");
//}
// azEnd();
TAOS_RETURN(code);
}
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
int32_t code = 0;
std::string endpointUrl = tsS3Hostname[0]; // GetEndpointUrl();
std::string accountName = tsS3AccessKeyId[0]; // GetAccountName();
std::string accountKey = tsS3AccessKeySecret[0]; // GetAccountKey();
try {
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
std::string accountURL = tsS3Hostname[0];
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
accountURL = "https://" + accountURL;
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
std::string containerName = tsS3BucketName;
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
// Create the container if it does not exist
// std::cout << "Creating container: " << containerName << std::endl;
// containerClient.CreateIfNotExists();
std::string blobName = "blob.txt";
uint8_t blobContent[] = "Hello Azure!";
// Create the block blob client
// BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
// TDBlockBlobClient blobClient(containerClient.GetBlobClient(blobName));
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
// Upload the blob
// std::cout << "Uploading blob: " << blobName << std::endl;
// blobClient.UploadFrom(blobContent, sizeof(blobContent));
blobClient.UploadFrom(file, offset, size);
//(void)_azUploadFrom(blobClient, file, offset, size);
/*
auto blockBlobClient = BlockBlobClient(endpointUrl, sharedKeyCredential);
// Create some data to upload into the blob.
std::vector<uint8_t> data = {1, 2, 3, 4};
Azure::Core::IO::MemoryBodyStream stream(data);
Azure::Response<Models::UploadBlockBlobResult> response = blockBlobClient.Upload(stream);
Models::UploadBlockBlobResult model = response.Value;
std::cout << "Last modified date of uploaded blob: " << model.LastModified.ToString()
<< std::endl;
*/
} catch (const Azure::Core::RequestFailedException &e) {
/*
std::cout << "Status Code: " << static_cast<int>(e.StatusCode) << ", Reason Phrase: " << e.ReasonPhrase
<< std::endl;
std::cout << e.what() << std::endl;
*/
code = TAOS_SYSTEM_ERROR(EIO);
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
int32_t code = TSDB_CODE_SUCCESS;
std::string accountName = tsS3AccessKeyId[0];
std::string accountKey = tsS3AccessKeySecret[0];
std::string accountURL = tsS3Hostname[0];
accountURL = "https://" + accountURL;
try {
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
std::string containerName = tsS3BucketName;
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
uint8_t *buf = (uint8_t *)taosMemoryCalloc(1, size);
if (!buf) {
return terrno;
}
Blobs::DownloadBlobToOptions options;
// options.TransferOptions.Concurrency = concurrency;
// if (offset.HasValue() || length.HasValue()) {
options.Range = Azure::Core::Http::HttpRange();
options.Range.Value().Offset = offset;
options.Range.Value().Length = size;
//}
/*
if (initialChunkSize.HasValue()) {
options.TransferOptions.InitialChunkSize = initialChunkSize.Value();
}
if (chunkSize.HasValue()) {
options.TransferOptions.ChunkSize = chunkSize.Value();
}
*/
auto res = blobClient.DownloadTo(buf, size, options);
if (check && res.Value.ContentRange.Length.Value() != size) {
code = TAOS_SYSTEM_ERROR(EIO);
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
}
*ppBlock = buf;
} catch (const Azure::Core::RequestFailedException &e) {
uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast<int>(e.StatusCode),
e.ReasonPhrase.c_str());
code = TAOS_SYSTEM_ERROR(EIO);
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
void azDeleteObjectsByPrefix(const char *prefix) {
const std::string delimiter = "/";
std::string accountName = tsS3AccessKeyId[0];
std::string accountKey = tsS3AccessKeySecret[0];
std::string accountURL = tsS3Hostname[0];
accountURL = "https://" + accountURL;
try {
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
std::string containerName = tsS3BucketName;
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = prefix;
std::set<std::string> listBlobs;
for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) {
for (const auto &blob : pageResult.Blobs) {
listBlobs.insert(blob.Name);
}
}
for (auto blobName : listBlobs) {
auto blobClient = containerClient.GetAppendBlobClient(blobName);
blobClient.Delete();
}
} catch (const Azure::Core::RequestFailedException &e) {
uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast<int>(e.StatusCode),
e.ReasonPhrase.c_str());
// uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TAOS_SYSTEM_ERROR(EIO)));
}
}
int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; }
int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; }
#else
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
void azEnd() {}
int32_t azCheckCfg() { return TSDB_CODE_SUCCESS; }
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
return TSDB_CODE_SUCCESS;
}
int32_t azGetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
return TSDB_CODE_SUCCESS;
}
void azDeleteObjectsByPrefix(const char *prefix) {}
int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; }
int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; }
#endif

View File

@ -0,0 +1,625 @@
#if defined(USE_S3)
#include "td_block_blob_client.hpp"
#include <azure/core/platform.hpp>
#if defined(AZ_PLATFORM_WINDOWS)
#if !defined(WIN32_LEAN_AND_MEAN)
#define WIN32_LEAN_AND_MEAN
#endif
#if !defined(NOMINMAX)
#define NOMINMAX
#endif
#include <windows.h>
#endif
#include "./avro_parser.hpp"
#include <azure/core/io/body_stream.hpp>
#include <azure/storage/common/crypt.hpp>
#include <azure/storage/common/internal/concurrent_transfer.hpp>
#include <azure/storage/common/internal/constants.hpp>
#include <azure/storage/common/internal/file_io.hpp>
#include <azure/storage/common/internal/storage_switch_to_secondary_policy.hpp>
#include <azure/storage/common/storage_common.hpp>
#include <azure/storage/common/storage_exception.hpp>
namespace Azure {
namespace Storage {
namespace Blobs {
TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::string& connectionString,
const std::string& blobContainerName,
const std::string& blobName,
const BlobClientOptions& options) {
TDBlockBlobClient newClient(
BlobClient::CreateFromConnectionString(connectionString, blobContainerName, blobName, options));
return newClient;
}
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr<StorageSharedKeyCredential> credential,
const BlobClientOptions& options)
: BlobClient(blobUrl, std::move(credential), options) {}
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl,
std::shared_ptr<Core::Credentials::TokenCredential> credential,
const BlobClientOptions& options)
: BlobClient(blobUrl, std::move(credential), options) {}
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options)
: BlobClient(blobUrl, options) {}
TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {}
TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const {
TDBlockBlobClient newClient(*this);
if (snapshot.empty()) {
newClient.m_blobUrl.RemoveQueryParameter(_internal::HttpQuerySnapshot);
} else {
newClient.m_blobUrl.AppendQueryParameter(_internal::HttpQuerySnapshot,
_internal::UrlEncodeQueryParameter(snapshot));
}
return newClient;
}
TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId) const {
TDBlockBlobClient newClient(*this);
if (versionId.empty()) {
newClient.m_blobUrl.RemoveQueryParameter(_internal::HttpQueryVersionId);
} else {
newClient.m_blobUrl.AppendQueryParameter(_internal::HttpQueryVersionId,
_internal::UrlEncodeQueryParameter(versionId));
}
return newClient;
}
Azure::Response<Models::UploadBlockBlobResult> TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content,
const UploadBlockBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options,
const Azure::Core::Context& context) const {
constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL;
constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL;
constexpr int64_t MaxBlockNumber = 50000;
constexpr int64_t BlockGrainSize = 1 * 1024 * 1024;
if (static_cast<uint64_t>(options.TransferOptions.SingleUploadThreshold) > (std::numeric_limits<size_t>::max)()) {
throw Azure::Core::RequestFailedException("Single upload threshold is too big");
}
if (bufferSize <= static_cast<size_t>(options.TransferOptions.SingleUploadThreshold)) {
Azure::Core::IO::MemoryBodyStream contentStream(buffer, bufferSize);
UploadBlockBlobOptions uploadBlockBlobOptions;
uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders;
uploadBlockBlobOptions.Metadata = options.Metadata;
uploadBlockBlobOptions.Tags = options.Tags;
uploadBlockBlobOptions.AccessTier = options.AccessTier;
uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold;
return Upload(contentStream, uploadBlockBlobOptions, context);
}
int64_t chunkSize;
if (options.TransferOptions.ChunkSize.HasValue()) {
chunkSize = options.TransferOptions.ChunkSize.Value();
} else {
int64_t minChunkSize = (bufferSize + MaxBlockNumber - 1) / MaxBlockNumber;
minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize;
chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize);
}
if (chunkSize > MaxStageBlockSize) {
throw Azure::Core::RequestFailedException("Block size is too big.");
}
std::vector<std::string> blockIds;
auto getBlockId = [](int64_t id) {
constexpr size_t BlockIdLength = 64;
std::string blockId = std::to_string(id);
blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId;
return Azure::Core::Convert::Base64Encode(std::vector<uint8_t>(blockId.begin(), blockId.end()));
};
auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) {
Azure::Core::IO::MemoryBodyStream contentStream(buffer + offset, static_cast<size_t>(length));
StageBlockOptions chunkOptions;
auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context);
if (chunkId == numChunks - 1) {
blockIds.resize(static_cast<size_t>(numChunks));
}
};
_internal::ConcurrentTransfer(0, bufferSize, chunkSize, options.TransferOptions.Concurrency, uploadBlockFunc);
for (size_t i = 0; i < blockIds.size(); ++i) {
blockIds[i] = getBlockId(static_cast<int64_t>(i));
}
CommitBlockListOptions commitBlockListOptions;
commitBlockListOptions.HttpHeaders = options.HttpHeaders;
commitBlockListOptions.Metadata = options.Metadata;
commitBlockListOptions.Tags = options.Tags;
commitBlockListOptions.AccessTier = options.AccessTier;
commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
commitBlockListOptions.HasLegalHold = options.HasLegalHold;
auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context);
Models::UploadBlockBlobFromResult ret;
ret.ETag = std::move(commitBlockListResponse.Value.ETag);
ret.LastModified = std::move(commitBlockListResponse.Value.LastModified);
ret.VersionId = std::move(commitBlockListResponse.Value.VersionId);
ret.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted;
ret.EncryptionKeySha256 = std::move(commitBlockListResponse.Value.EncryptionKeySha256);
ret.EncryptionScope = std::move(commitBlockListResponse.Value.EncryptionScope);
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(ret),
std::move(commitBlockListResponse.RawResponse));
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const std::string& fileName, const UploadBlockBlobFromOptions& options, const Azure::Core::Context& context) const {
constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL;
constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL;
constexpr int64_t MaxBlockNumber = 50000;
constexpr int64_t BlockGrainSize = 1 * 1024 * 1024;
{
Azure::Core::IO::FileBodyStream contentStream(fileName);
if (contentStream.Length() <= options.TransferOptions.SingleUploadThreshold) {
UploadBlockBlobOptions uploadBlockBlobOptions;
uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders;
uploadBlockBlobOptions.Metadata = options.Metadata;
uploadBlockBlobOptions.Tags = options.Tags;
uploadBlockBlobOptions.AccessTier = options.AccessTier;
uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold;
return Upload(contentStream, uploadBlockBlobOptions, context);
}
}
std::vector<std::string> blockIds;
auto getBlockId = [](int64_t id) {
constexpr size_t BlockIdLength = 64;
std::string blockId = std::to_string(id);
blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId;
return Azure::Core::Convert::Base64Encode(std::vector<uint8_t>(blockId.begin(), blockId.end()));
};
_internal::FileReader fileReader(fileName);
auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) {
Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, length);
StageBlockOptions chunkOptions;
auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context);
if (chunkId == numChunks - 1) {
blockIds.resize(static_cast<size_t>(numChunks));
}
};
int64_t chunkSize;
if (options.TransferOptions.ChunkSize.HasValue()) {
chunkSize = options.TransferOptions.ChunkSize.Value();
} else {
int64_t minChunkSize = (fileReader.GetFileSize() + MaxBlockNumber - 1) / MaxBlockNumber;
minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize;
chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize);
}
if (chunkSize > MaxStageBlockSize) {
throw Azure::Core::RequestFailedException("Block size is too big.");
}
_internal::ConcurrentTransfer(0, fileReader.GetFileSize(), chunkSize, options.TransferOptions.Concurrency,
uploadBlockFunc);
for (size_t i = 0; i < blockIds.size(); ++i) {
blockIds[i] = getBlockId(static_cast<int64_t>(i));
}
CommitBlockListOptions commitBlockListOptions;
commitBlockListOptions.HttpHeaders = options.HttpHeaders;
commitBlockListOptions.Metadata = options.Metadata;
commitBlockListOptions.Tags = options.Tags;
commitBlockListOptions.AccessTier = options.AccessTier;
commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
commitBlockListOptions.HasLegalHold = options.HasLegalHold;
auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context);
Models::UploadBlockBlobFromResult result;
result.ETag = commitBlockListResponse.Value.ETag;
result.LastModified = commitBlockListResponse.Value.LastModified;
result.VersionId = commitBlockListResponse.Value.VersionId;
result.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted;
result.EncryptionKeySha256 = commitBlockListResponse.Value.EncryptionKeySha256;
result.EncryptionScope = commitBlockListResponse.Value.EncryptionScope;
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options,
const Azure::Core::Context& context) const {
_internal::FileReader fileReader(fileName);
{
Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, size);
if (size <= options.TransferOptions.SingleUploadThreshold) {
UploadBlockBlobOptions uploadBlockBlobOptions;
uploadBlockBlobOptions.HttpHeaders = options.HttpHeaders;
uploadBlockBlobOptions.Metadata = options.Metadata;
uploadBlockBlobOptions.Tags = options.Tags;
uploadBlockBlobOptions.AccessTier = options.AccessTier;
uploadBlockBlobOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
uploadBlockBlobOptions.HasLegalHold = options.HasLegalHold;
return Upload(contentStream, uploadBlockBlobOptions, context);
}
}
std::vector<std::string> blockIds;
auto getBlockId = [](int64_t id) {
constexpr size_t BlockIdLength = 64;
std::string blockId = std::to_string(id);
blockId = std::string(BlockIdLength - blockId.length(), '0') + blockId;
return Azure::Core::Convert::Base64Encode(std::vector<uint8_t>(blockId.begin(), blockId.end()));
};
auto uploadBlockFunc = [&](int64_t offset, int64_t length, int64_t chunkId, int64_t numChunks) {
Azure::Core::IO::_internal::RandomAccessFileBodyStream contentStream(fileReader.GetHandle(), offset, length);
StageBlockOptions chunkOptions;
auto blockInfo = StageBlock(getBlockId(chunkId), contentStream, chunkOptions, context);
if (chunkId == numChunks - 1) {
blockIds.resize(static_cast<size_t>(numChunks));
}
};
constexpr int64_t DefaultStageBlockSize = 4 * 1024 * 1024ULL;
constexpr int64_t MaxStageBlockSize = 4000 * 1024 * 1024ULL;
constexpr int64_t MaxBlockNumber = 50000;
constexpr int64_t BlockGrainSize = 1 * 1024 * 1024;
int64_t chunkSize;
if (options.TransferOptions.ChunkSize.HasValue()) {
chunkSize = options.TransferOptions.ChunkSize.Value();
} else {
int64_t minChunkSize = (size + MaxBlockNumber - 1) / MaxBlockNumber;
minChunkSize = (minChunkSize + BlockGrainSize - 1) / BlockGrainSize * BlockGrainSize;
chunkSize = (std::max)(DefaultStageBlockSize, minChunkSize);
}
if (chunkSize > MaxStageBlockSize) {
throw Azure::Core::RequestFailedException("Block size is too big.");
}
_internal::ConcurrentTransfer(offset, size, chunkSize, options.TransferOptions.Concurrency, uploadBlockFunc);
for (size_t i = 0; i < blockIds.size(); ++i) {
blockIds[i] = getBlockId(static_cast<int64_t>(i));
}
CommitBlockListOptions commitBlockListOptions;
commitBlockListOptions.HttpHeaders = options.HttpHeaders;
commitBlockListOptions.Metadata = options.Metadata;
commitBlockListOptions.Tags = options.Tags;
commitBlockListOptions.AccessTier = options.AccessTier;
commitBlockListOptions.ImmutabilityPolicy = options.ImmutabilityPolicy;
commitBlockListOptions.HasLegalHold = options.HasLegalHold;
auto commitBlockListResponse = CommitBlockList(blockIds, commitBlockListOptions, context);
Models::UploadBlockBlobFromResult result;
result.ETag = commitBlockListResponse.Value.ETag;
result.LastModified = commitBlockListResponse.Value.LastModified;
result.VersionId = commitBlockListResponse.Value.VersionId;
result.IsServerEncrypted = commitBlockListResponse.Value.IsServerEncrypted;
result.EncryptionKeySha256 = commitBlockListResponse.Value.EncryptionKeySha256;
result.EncryptionScope = commitBlockListResponse.Value.EncryptionScope;
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
Azure::Response<Models::UploadBlockBlobFromUriResult> TDBlockBlobClient::UploadFromUri(
const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobFromUriOptions protocolLayerOptions;
protocolLayerOptions.CopySource = sourceUri;
protocolLayerOptions.CopySourceBlobProperties = options.CopySourceBlobProperties;
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
protocolLayerOptions.SourceIfMatch = options.SourceAccessConditions.IfMatch;
protocolLayerOptions.SourceIfNoneMatch = options.SourceAccessConditions.IfNoneMatch;
protocolLayerOptions.SourceIfModifiedSince = options.SourceAccessConditions.IfModifiedSince;
protocolLayerOptions.SourceIfUnmodifiedSince = options.SourceAccessConditions.IfUnmodifiedSince;
protocolLayerOptions.SourceIfTags = options.SourceAccessConditions.TagConditions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.SourceContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.SourceContentcrc64 = options.TransactionalContentHash.Value().Value;
}
}
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
protocolLayerOptions.CopySourceTags = options.CopySourceTagsMode;
if (!options.SourceAuthorization.empty()) {
protocolLayerOptions.CopySourceAuthorization = options.SourceAuthorization;
}
return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
Azure::Response<Models::StageBlockResult> TDBlockBlobClient::StageBlock(const std::string& blockId,
Azure::Core::IO::BodyStream& content,
const StageBlockOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::StageBlockBlobBlockOptions protocolLayerOptions;
protocolLayerOptions.BlockId = blockId;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::StageBlockFromUriResult> TDBlockBlobClient::StageBlockFromUri(
const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::StageBlockBlobBlockFromUriOptions protocolLayerOptions;
protocolLayerOptions.BlockId = blockId;
protocolLayerOptions.SourceUrl = sourceUri;
if (options.SourceRange.HasValue()) {
std::string rangeStr = "bytes=" + std::to_string(options.SourceRange.Value().Offset) + "-";
if (options.SourceRange.Value().Length.HasValue()) {
rangeStr += std::to_string(options.SourceRange.Value().Offset + options.SourceRange.Value().Length.Value() - 1);
}
protocolLayerOptions.SourceRange = rangeStr;
}
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.SourceContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.SourceContentcrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.SourceIfModifiedSince = options.SourceAccessConditions.IfModifiedSince;
protocolLayerOptions.SourceIfUnmodifiedSince = options.SourceAccessConditions.IfUnmodifiedSince;
protocolLayerOptions.SourceIfMatch = options.SourceAccessConditions.IfMatch;
protocolLayerOptions.SourceIfNoneMatch = options.SourceAccessConditions.IfNoneMatch;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (!options.SourceAuthorization.empty()) {
protocolLayerOptions.CopySourceAuthorization = options.SourceAuthorization;
}
return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
Azure::Response<Models::CommitBlockListResult> TDBlockBlobClient::CommitBlockList(
const std::vector<std::string>& blockIds, const CommitBlockListOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::CommitBlockBlobBlockListOptions protocolLayerOptions;
protocolLayerOptions.Blocks.Latest = blockIds;
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
Azure::Response<Models::GetBlockListResult> TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions;
protocolLayerOptions.ListType = options.ListType;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions,
_internal::WithReplicaStatus(context));
}
/*
Azure::Response<Models::QueryBlobResult> TDBlockBlobClient::Query(const std::string& querySqlExpression,
const QueryBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlobClient::QueryBlobOptions protocolLayerOptions;
protocolLayerOptions.QueryRequest.QueryType = Models::_detail::QueryRequestQueryType::SQL;
protocolLayerOptions.QueryRequest.Expression = querySqlExpression;
if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Delimited) {
Models::_detail::DelimitedTextConfiguration c;
c.RecordSeparator = options.InputTextConfiguration.m_recordSeparator;
c.ColumnSeparator = options.InputTextConfiguration.m_columnSeparator;
c.FieldQuote = options.InputTextConfiguration.m_quotationCharacter;
c.EscapeChar = options.InputTextConfiguration.m_escapeCharacter;
c.HeadersPresent = options.InputTextConfiguration.m_hasHeaders;
Models::_detail::QuerySerialization q;
q.Format.Type = options.InputTextConfiguration.m_format;
q.Format.DelimitedTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.InputSerialization = std::move(q);
} else if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Json) {
Models::_detail::JsonTextConfiguration c;
c.RecordSeparator = options.InputTextConfiguration.m_recordSeparator;
Models::_detail::QuerySerialization q;
q.Format.Type = options.InputTextConfiguration.m_format;
q.Format.JsonTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.InputSerialization = std::move(q);
} else if (options.InputTextConfiguration.m_format == Models::_detail::QueryFormatType::Parquet) {
Models::_detail::ParquetConfiguration c;
Models::_detail::QuerySerialization q;
q.Format.Type = options.InputTextConfiguration.m_format;
q.Format.ParquetTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.InputSerialization = std::move(q);
} else if (options.InputTextConfiguration.m_format.ToString().empty()) {
} else {
AZURE_UNREACHABLE_CODE();
}
if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Delimited) {
Models::_detail::DelimitedTextConfiguration c;
c.RecordSeparator = options.OutputTextConfiguration.m_recordSeparator;
c.ColumnSeparator = options.OutputTextConfiguration.m_columnSeparator;
c.FieldQuote = options.OutputTextConfiguration.m_quotationCharacter;
c.EscapeChar = options.OutputTextConfiguration.m_escapeCharacter;
c.HeadersPresent = options.OutputTextConfiguration.m_hasHeaders;
Models::_detail::QuerySerialization q;
q.Format.Type = options.OutputTextConfiguration.m_format;
q.Format.DelimitedTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q);
} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Json) {
Models::_detail::JsonTextConfiguration c;
c.RecordSeparator = options.OutputTextConfiguration.m_recordSeparator;
Models::_detail::QuerySerialization q;
q.Format.Type = options.OutputTextConfiguration.m_format;
q.Format.JsonTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q);
} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Parquet) {
Models::_detail::ParquetConfiguration c;
Models::_detail::QuerySerialization q;
q.Format.Type = options.OutputTextConfiguration.m_format;
q.Format.ParquetTextConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q);
} else if (options.OutputTextConfiguration.m_format == Models::_detail::QueryFormatType::Arrow) {
Models::_detail::ArrowConfiguration c;
c.Schema = options.OutputTextConfiguration.m_schema;
Models::_detail::QuerySerialization q;
q.Format.Type = options.OutputTextConfiguration.m_format;
q.Format.ArrowConfiguration = std::move(c);
protocolLayerOptions.QueryRequest.OutputSerialization = std::move(q);
} else if (options.InputTextConfiguration.m_format.ToString().empty()) {
} else {
AZURE_UNREACHABLE_CODE();
}
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
auto response =
_detail::BlobClient::Query(*m_pipeline, m_blobUrl, protocolLayerOptions, _internal::WithReplicaStatus(context));
const auto statusCode = response.RawResponse->GetStatusCode();
const auto reasonPhrase = response.RawResponse->GetReasonPhrase();
const auto requestId = response.RawResponse->GetHeaders().count(_internal::HttpHeaderRequestId) != 0
? response.RawResponse->GetHeaders().at(_internal::HttpHeaderRequestId)
: std::string();
const auto clientRequestId = response.RawResponse->GetHeaders().count(_internal::HttpHeaderClientRequestId) != 0
? response.RawResponse->GetHeaders().at(_internal::HttpHeaderClientRequestId)
: std::string();
auto defaultErrorHandler = [statusCode, reasonPhrase, requestId, clientRequestId](BlobQueryError e) {
if (e.IsFatal) {
StorageException exception("Fatal " + e.Name + " at " + std::to_string(e.Position));
exception.StatusCode = statusCode;
exception.ReasonPhrase = reasonPhrase;
exception.RequestId = requestId;
exception.ClientRequestId = clientRequestId;
exception.ErrorCode = e.Name;
exception.Message = e.Description;
throw exception;
}
};
response.Value.BodyStream =
std::make_unique<_detail::AvroStreamParser>(std::move(response.Value.BodyStream), options.ProgressHandler,
options.ErrorHandler ? options.ErrorHandler : defaultErrorHandler);
return response;
}
*/
} // namespace Blobs
} // namespace Storage
} // namespace Azure
#endif

View File

@ -0,0 +1,18 @@
aux_source_directory(. AZ_TEST_SRC)
add_executable(azTest ${AZ_TEST_SRC})
target_include_directories(azTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/azure"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(azTest
az
gtest_main
)
enable_testing()
add_test(
NAME az_test
COMMAND azTest
)

View File

@ -0,0 +1,457 @@
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
/*
#include "walInt.h"
const char* ranStr = "tvapq02tcp";
const int ranStrLen = strlen(ranStr);
SWalSyncInfo syncMeta = {0};
class WalCleanEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void SetUp() override {
taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
taosMemoryFree(pCfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
class WalCleanDeleteEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void SetUp() override {
taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
taosMemoryFree(pCfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
class WalKeepEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg* pCfg = (SWalCfg*)taosMemoryMalloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
taosMemoryFree(pCfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
class WalRetentionEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;
cfg.vgId = 0;
cfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL);
ASSERT_EQ(pWal->fileInfoSet->size, 1);
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
ASSERT_EQ(pInfo->firstVer, 0);
ASSERT_EQ(pInfo->lastVer, -1);
ASSERT_EQ(pInfo->closeTs, -1);
ASSERT_EQ(pInfo->fileSize, 0);
}
TEST_F(WalCleanEnv, serialize) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
char* ss = NULL;
code = walMetaSerialize(pWal, &ss);
ASSERT(code == 0);
printf("%s\n", ss);
taosMemoryFree(ss);
code = walSaveMeta(pWal);
ASSERT(code == 0);
}
TEST_F(WalCleanEnv, removeOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walSaveMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walSaveMeta(pWal);
ASSERT(code == 0);
}
TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv();
int code;
syncMeta.isWeek = -1;
syncMeta.seqNum = UINT64_MAX;
syncMeta.term = UINT64_MAX;
for (int i = 0; i < 10; i++) {
code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
ASSERT_EQ(pWal->vers.lastVer, i);
}
char* oldss = NULL;
code = walMetaSerialize(pWal, &oldss);
ASSERT(code == 0);
TearDown();
SetUp();
ASSERT_EQ(pWal->vers.firstVer, 0);
ASSERT_EQ(pWal->vers.lastVer, 9);
char* newss = NULL;
code = walMetaSerialize(pWal, &newss);
ASSERT(code == 0);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for (int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
}
taosMemoryFree(oldss);
taosMemoryFree(newss);
}
TEST_F(WalCleanEnv, write) {
int code;
for (int i = 0; i < 10; i++) {
code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walAppendLog(pWal, i + 2, i, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
ASSERT_EQ(pWal->vers.lastVer, i);
}
code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanEnv, rollback) {
int code;
for (int i = 0; i < 10; i++) {
code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
code = walRollback(pWal, 12);
ASSERT_NE(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 9);
code = walRollback(pWal, 9);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 8);
code = walRollback(pWal, 5);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 4);
code = walRollback(pWal, 3);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 2);
code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanEnv, rollbackMultiFile) {
int code;
for (int i = 0; i < 10; i++) {
code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
if (i == 5) {
walBeginSnapshot(pWal, i, 0);
walEndSnapshot(pWal);
}
}
code = walRollback(pWal, 12);
ASSERT_NE(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 9);
code = walRollback(pWal, 9);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 8);
code = walRollback(pWal, 6);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 5);
code = walRollback(pWal, 5);
ASSERT_NE(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 5);
code = walAppendLog(pWal, 6, 6, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 6);
code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanDeleteEnv, roll) {
int code;
int i;
for (i = 0; i < 100; i++) {
code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->vers.commitVer, i);
}
walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
walEndSnapshot(pWal);
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walAppendLog(pWal, 5, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0);
for (; i < 200; i++) {
code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->vers.commitVer, i);
}
code = walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(code, 0);
code = walEndSnapshot(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
ASSERT(pRead != NULL);
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
walCloseReader(pRead);
}
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
TearDown();
// getchar();
char buf[100];
sprintf(buf, "%s/meta-ver%d", pathName, 0);
taosRemoveFile(buf);
sprintf(buf, "%s/meta-ver%d", pathName, 1);
taosRemoveFile(buf);
SetUp();
// getchar();
ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
for (i = 100; i < 200; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 200;
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver + 1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
walCloseReader(pRead);
}
*/

View File

@ -9,7 +9,7 @@ target_include_directories(
target_link_libraries(
tcs
# PUBLIC az
PUBLIC az
PUBLIC common
# PUBLIC cjson
# PUBLIC os

View File

@ -19,7 +19,7 @@
#include "taoserror.h"
#include "tglobal.h"
//#include "az.h"
#include "az.h"
#include "cos.h"
extern int8_t tsS3Ablob;
@ -68,7 +68,6 @@ int32_t tcsInit() {
tcs.DeleteObjects = s3DeleteObjects;
tcs.GetObjectToFile = s3GetObjectToFile;
} else if (TOS_PROTO_ABLOB == proto) {
/*
tcs.Begin = azBegin;
tcs.End = azEnd;
tcs.CheckCfg = azCheckCfg;
@ -82,7 +81,7 @@ int32_t tcsInit() {
tcs.GetObjectsByPrefix = azGetObjectsByPrefix;
tcs.DeleteObjects = azDeleteObjects;
tcs.GetObjectToFile = azGetObjectToFile;
*/
} else {
code = TSDB_CODE_INVALID_PARA;
return code;