From 9c7eaa1633920ef1f1c2594c6cd5690f75b9c9bd Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 22 Oct 2024 13:17:14 +0800 Subject: [PATCH] az/header: move header file to inc directory --- source/libs/azure/src/avro_parser.cpp | 531 ------------------ source/libs/azure/src/avro_parser.hpp | 198 ------- source/libs/azure/src/td_avro_parser.cpp | 2 +- .../libs/azure/src/td_block_blob_client.cpp | 2 +- 4 files changed, 2 insertions(+), 731 deletions(-) delete mode 100644 source/libs/azure/src/avro_parser.cpp delete mode 100644 source/libs/azure/src/avro_parser.hpp diff --git a/source/libs/azure/src/avro_parser.cpp b/source/libs/azure/src/avro_parser.cpp deleted file mode 100644 index 485980e007..0000000000 --- a/source/libs/azure/src/avro_parser.cpp +++ /dev/null @@ -1,531 +0,0 @@ -#if defined(USE_S3) -#include "avro_parser.hpp" - -#include -#include - -#include -#include - -namespace Azure { -namespace Storage { -namespace Blobs { -namespace _detail { - -namespace { -int64_t parseInt(AvroStreamReader::ReaderPos& data) { - uint64_t r = 0; - int nb = 0; - while (true) { - uint8_t c = (*data.BufferPtr)[data.Offset++]; - r = r | ((static_cast(c) & 0x7f) << (nb * 7)); - if (c & 0x80) { - ++nb; - continue; - } - break; - } - return static_cast(r >> 1) ^ -static_cast(r & 0x01); -} - -AvroSchema ParseSchemaFromJsonString(const std::string& jsonSchema) { - const static std::map BuiltinNameSchemaMap = { - {"string", AvroSchema::StringSchema}, {"bytes", AvroSchema::BytesSchema}, {"int", AvroSchema::IntSchema}, - {"long", AvroSchema::LongSchema}, {"float", AvroSchema::FloatSchema}, {"double", AvroSchema::DoubleSchema}, - {"boolean", AvroSchema::BoolSchema}, {"null", AvroSchema::NullSchema}, {"string", AvroSchema::StringSchema}, - }; - std::map nameSchemaMap = BuiltinNameSchemaMap; - - std::function parseSchemaFromJsonObject; - parseSchemaFromJsonObject = [&](const Core::Json::_internal::json& obj) -> AvroSchema { - if (obj.is_string()) { - auto typeName = obj.get(); - return nameSchemaMap.find(typeName)->second; - } else if (obj.is_array()) { - std::vector unionSchemas; - for (const auto& s : obj) { - unionSchemas.push_back(parseSchemaFromJsonObject(s)); - } - return AvroSchema::UnionSchema(std::move(unionSchemas)); - } else if (obj.is_object()) { - if (obj.count("namespace") != 0) { - throw std::runtime_error("Namespace isn't supported yet in Avro schema."); - } - if (obj.count("aliases") != 0) { - throw std::runtime_error("Alias isn't supported yet in Avro schema."); - } - auto typeName = obj["type"].get(); - auto i = nameSchemaMap.find(typeName); - if (i != nameSchemaMap.end()) { - return i->second; - } - if (typeName == "record") { - std::vector> fieldsSchema; - for (const auto& field : obj["fields"]) { - fieldsSchema.push_back( - std::make_pair(field["name"].get(), parseSchemaFromJsonObject(field["type"]))); - } - - const std::string recordName = obj["name"].get(); - auto recordSchema = AvroSchema::RecordSchema(recordName, std::move(fieldsSchema)); - nameSchemaMap.insert(std::make_pair(recordName, recordSchema)); - return recordSchema; - } else if (typeName == "enum") { - throw std::runtime_error("Enum type isn't supported yet in Avro schema."); - } else if (typeName == "array") { - return AvroSchema::ArraySchema(parseSchemaFromJsonObject(obj["items"])); - } else if (typeName == "map") { - return AvroSchema::MapSchema(parseSchemaFromJsonObject(obj["items"])); - } else if (typeName == "fixed") { - const std::string fixedName = obj["name"].get(); - auto fixedSchema = AvroSchema::FixedSchema(fixedName, obj["size"].get()); - nameSchemaMap.insert(std::make_pair(fixedName, fixedSchema)); - return fixedSchema; - } else { - throw std::runtime_error("Unrecognized type " + typeName + " in Avro schema."); - } - } - AZURE_UNREACHABLE_CODE(); - }; - - auto jsonRoot = Core::Json::_internal::json::parse(jsonSchema.begin(), jsonSchema.end()); - return parseSchemaFromJsonObject(jsonRoot); -} -} // namespace - -int64_t AvroStreamReader::ParseInt(const Core::Context& context) { - uint64_t r = 0; - int nb = 0; - while (true) { - Preload(1, context); - uint8_t c = m_streambuffer[m_pos.Offset++]; - - r = r | ((static_cast(c) & 0x7f) << (nb * 7)); - if (c & 0x80) { - ++nb; - continue; - } - break; - } - return static_cast(r >> 1) ^ -static_cast(r & 0x01); -} - -void AvroStreamReader::Advance(size_t n, const Core::Context& context) { - Preload(n, context); - m_pos.Offset += n; -} - -size_t AvroStreamReader::Preload(size_t n, const Core::Context& context) { - size_t oldAvailable = AvailableBytes(); - while (true) { - size_t newAvailable = TryPreload(n, context); - if (newAvailable >= n) { - return newAvailable; - } - if (oldAvailable == newAvailable) { - throw std::runtime_error("Unexpected EOF of Avro stream."); - } - oldAvailable = newAvailable; - } - AZURE_UNREACHABLE_CODE(); -} - -size_t AvroStreamReader::TryPreload(size_t n, const Core::Context& context) { - size_t availableBytes = AvailableBytes(); - if (availableBytes >= n) { - return availableBytes; - } - const size_t MinRead = 4096; - size_t tryReadSize = (std::max)(n, MinRead); - size_t currSize = m_streambuffer.size(); - m_streambuffer.resize(m_streambuffer.size() + tryReadSize); - size_t actualReadSize = m_stream->Read(m_streambuffer.data() + currSize, tryReadSize, context); - m_streambuffer.resize(currSize + actualReadSize); - return AvailableBytes(); -} - -void AvroStreamReader::Discard() { - constexpr size_t MinimumReleaseMemory = 128 * 1024; - if (m_pos.Offset < MinimumReleaseMemory) { - return; - } - const size_t availableBytes = AvailableBytes(); - std::memmove(&m_streambuffer[0], &m_streambuffer[m_pos.Offset], availableBytes); - m_streambuffer.resize(availableBytes); - m_pos.Offset = 0; -} - -const AvroSchema AvroSchema::StringSchema(AvroDatumType::String); -const AvroSchema AvroSchema::BytesSchema(AvroDatumType::Bytes); -const AvroSchema AvroSchema::IntSchema(AvroDatumType::Int); -const AvroSchema AvroSchema::LongSchema(AvroDatumType::Long); -const AvroSchema AvroSchema::FloatSchema(AvroDatumType::Float); -const AvroSchema AvroSchema::DoubleSchema(AvroDatumType::Double); -const AvroSchema AvroSchema::BoolSchema(AvroDatumType::Bool); -const AvroSchema AvroSchema::NullSchema(AvroDatumType::Null); - -AvroSchema AvroSchema::RecordSchema(std::string name, - const std::vector>& fieldsSchema) { - AvroSchema recordSchema(AvroDatumType::Record); - recordSchema.m_name = std::move(name); - recordSchema.m_status = std::make_shared(); - for (auto& i : fieldsSchema) { - recordSchema.m_status->m_keys.push_back(i.first); - recordSchema.m_status->m_schemas.push_back(i.second); - } - return recordSchema; -} - -AvroSchema AvroSchema::ArraySchema(AvroSchema elementSchema) { - AvroSchema arraySchema(AvroDatumType::Array); - arraySchema.m_status = std::make_shared(); - arraySchema.m_status->m_schemas.push_back(std::move(elementSchema)); - return arraySchema; -} - -AvroSchema AvroSchema::MapSchema(AvroSchema elementSchema) { - AvroSchema mapSchema(AvroDatumType::Map); - mapSchema.m_status = std::make_shared(); - mapSchema.m_status->m_schemas.push_back(std::move(elementSchema)); - return mapSchema; -} - -AvroSchema AvroSchema::UnionSchema(std::vector schemas) { - AvroSchema unionSchema(AvroDatumType::Union); - unionSchema.m_status = std::make_shared(); - unionSchema.m_status->m_schemas = std::move(schemas); - return unionSchema; -} - -AvroSchema AvroSchema::FixedSchema(std::string name, int64_t size) { - AvroSchema fixedSchema(AvroDatumType::Fixed); - fixedSchema.m_name = std::move(name); - fixedSchema.m_status = std::make_shared(); - fixedSchema.m_status->m_size = size; - return fixedSchema; -} - -void AvroDatum::Fill(AvroStreamReader& reader, const Core::Context& context) { - m_data = reader.m_pos; - if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { - int64_t stringSize = reader.ParseInt(context); - reader.Advance(static_cast(stringSize), context); - } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || - m_schema.Type() == AvroDatumType::Enum) { - reader.ParseInt(context); - } else if (m_schema.Type() == AvroDatumType::Float) { - reader.Advance(4, context); - } else if (m_schema.Type() == AvroDatumType::Double) { - reader.Advance(8, context); - } else if (m_schema.Type() == AvroDatumType::Bool) { - reader.Advance(1, context); - } else if (m_schema.Type() == AvroDatumType::Null) { - reader.Advance(0, context); - } else if (m_schema.Type() == AvroDatumType::Record) { - for (const auto& s : m_schema.FieldSchemas()) { - AvroDatum(s).Fill(reader, context); - } - } else if (m_schema.Type() == AvroDatumType::Array) { - while (true) { - int64_t numElementsInBlock = reader.ParseInt(context); - if (numElementsInBlock == 0) { - break; - } else if (numElementsInBlock < 0) { - int64_t blockSize = reader.ParseInt(context); - reader.Advance(static_cast(blockSize), context); - } else { - for (auto i = 0; i < numElementsInBlock; ++i) { - AvroDatum(m_schema.ItemSchema()).Fill(reader, context); - } - } - } - } else if (m_schema.Type() == AvroDatumType::Map) { - while (true) { - int64_t numElementsInBlock = reader.ParseInt(context); - if (numElementsInBlock == 0) { - break; - } else if (numElementsInBlock < 0) { - int64_t blockSize = reader.ParseInt(context); - reader.Advance(static_cast(blockSize), context); - } else { - for (int64_t i = 0; i < numElementsInBlock; ++i) { - AvroDatum(AvroSchema::StringSchema).Fill(reader, context); - AvroDatum(m_schema.ItemSchema()).Fill(reader, context); - } - } - } - } else if (m_schema.Type() == AvroDatumType::Union) { - int64_t i = reader.ParseInt(context); - AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(reader, context); - } else if (m_schema.Type() == AvroDatumType::Fixed) { - reader.Advance(m_schema.Size(), context); - } else { - AZURE_UNREACHABLE_CODE(); - } -} - -void AvroDatum::Fill(AvroStreamReader::ReaderPos& data) { - m_data = data; - if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { - int64_t stringSize = parseInt(data); - data.Offset += static_cast(stringSize); - } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || - m_schema.Type() == AvroDatumType::Enum) { - parseInt(data); - } else if (m_schema.Type() == AvroDatumType::Float) { - data.Offset += 4; - } else if (m_schema.Type() == AvroDatumType::Double) { - data.Offset += 8; - } else if (m_schema.Type() == AvroDatumType::Bool) { - data.Offset += 1; - } else if (m_schema.Type() == AvroDatumType::Null) { - data.Offset += 0; - } else if (m_schema.Type() == AvroDatumType::Record) { - for (const auto& s : m_schema.FieldSchemas()) { - AvroDatum(s).Fill(data); - } - } else if (m_schema.Type() == AvroDatumType::Array) { - while (true) { - int64_t numElementsInBlock = parseInt(data); - if (numElementsInBlock == 0) { - break; - } else if (numElementsInBlock < 0) { - int64_t blockSize = parseInt(data); - data.Offset += static_cast(blockSize); - } else { - for (auto i = 0; i < numElementsInBlock; ++i) { - AvroDatum(m_schema.ItemSchema()).Fill(data); - } - } - } - } else if (m_schema.Type() == AvroDatumType::Map) { - while (true) { - int64_t numElementsInBlock = parseInt(data); - if (numElementsInBlock == 0) { - break; - } else if (numElementsInBlock < 0) { - int64_t blockSize = parseInt(data); - data.Offset += static_cast(blockSize); - } else { - for (int64_t i = 0; i < numElementsInBlock; ++i) { - AvroDatum(AvroSchema::StringSchema).Fill(data); - AvroDatum(m_schema.ItemSchema()).Fill(data); - } - } - } - } else if (m_schema.Type() == AvroDatumType::Union) { - int64_t i = parseInt(data); - AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(data); - } else if (m_schema.Type() == AvroDatumType::Fixed) { - data.Offset += m_schema.Size(); - } else { - AZURE_UNREACHABLE_CODE(); - } -} - -template <> -AvroDatum::StringView AvroDatum::Value() const { - auto data = m_data; - if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { - const int64_t length = parseInt(data); - const uint8_t* start = &(*data.BufferPtr)[data.Offset]; - StringView ret{start, static_cast(length)}; - data.Offset += static_cast(length); - return ret; - } - if (m_schema.Type() == AvroDatumType::Fixed) { - const size_t fixedSize = m_schema.Size(); - const uint8_t* start = &(*data.BufferPtr)[data.Offset]; - StringView ret{start, fixedSize}; - data.Offset += fixedSize; - return ret; - } - AZURE_UNREACHABLE_CODE(); -} - -template <> -std::string AvroDatum::Value() const { - auto stringView = Value(); - return std::string(stringView.Data, stringView.Data + stringView.Length); -} - -template <> -std::vector AvroDatum::Value() const { - auto stringView = Value(); - return std::vector(stringView.Data, stringView.Data + stringView.Length); -} - -template <> -int64_t AvroDatum::Value() const { - auto data = m_data; - return parseInt(data); -} - -template <> -int32_t AvroDatum::Value() const { - return static_cast(Value()); -} - -template <> -bool AvroDatum::Value() const { - return Value(); -} - -template <> -std::nullptr_t AvroDatum::Value() const { - return nullptr; -} - -template <> -AvroRecord AvroDatum::Value() const { - auto data = m_data; - - AvroRecord r; - r.m_keys = &m_schema.FieldNames(); - for (const auto& schema : m_schema.FieldSchemas()) { - auto datum = AvroDatum(schema); - datum.Fill(data); - r.m_values.push_back(std::move(datum)); - } - - return r; -} - -template <> -AvroMap AvroDatum::Value() const { - auto data = m_data; - - AvroMap m; - while (true) { - int64_t numElementsInBlock = parseInt(data); - if (numElementsInBlock == 0) { - break; - } - if (numElementsInBlock < 0) { - numElementsInBlock = -numElementsInBlock; - parseInt(data); - } - for (int64_t i = 0; i < numElementsInBlock; ++i) { - auto keyDatum = AvroDatum(AvroSchema::StringSchema); - keyDatum.Fill(data); - auto valueDatum = AvroDatum(m_schema.ItemSchema()); - valueDatum.Fill(data); - m[keyDatum.Value()] = valueDatum; - } - } - return m; -} - -template <> -AvroDatum AvroDatum::Value() const { - auto data = m_data; - if (m_schema.Type() == AvroDatumType::Union) { - int64_t i = parseInt(data); - auto datum = AvroDatum(m_schema.FieldSchemas()[static_cast(i)]); - datum.Fill(data); - return datum; - } - AZURE_UNREACHABLE_CODE(); -} - -AvroObjectContainerReader::AvroObjectContainerReader(Core::IO::BodyStream& stream) - : m_reader(std::make_unique(stream)) {} - -AvroDatum AvroObjectContainerReader::NextImpl(const AvroSchema* schema, const Core::Context& context) { - AZURE_ASSERT_FALSE(m_eof); - static const auto SyncMarkerSchema = AvroSchema::FixedSchema("Sync", 16); - if (!schema) { - static AvroSchema FileHeaderSchema = []() { - std::vector> fieldsSchema; - fieldsSchema.push_back(std::make_pair("magic", AvroSchema::FixedSchema("Magic", 4))); - fieldsSchema.push_back(std::make_pair("meta", AvroSchema::MapSchema(AvroSchema::BytesSchema))); - fieldsSchema.push_back(std::make_pair("sync", SyncMarkerSchema)); - return AvroSchema::RecordSchema("org.apache.avro.file.Header", std::move(fieldsSchema)); - }(); - auto fileHeaderDatum = AvroDatum(FileHeaderSchema); - fileHeaderDatum.Fill(*m_reader, context); - auto fileHeader = fileHeaderDatum.Value(); - if (fileHeader.Field("magic").Value() != "Obj\01") { - throw std::runtime_error("Invalid Avro object container magic."); - } - AvroMap meta = fileHeader.Field("meta").Value(); - std::string objectSchemaJson = meta["avro.schema"].Value(); - std::string codec = "null"; - if (meta.count("avro.codec") != 0) { - codec = meta["avro.codec"].Value(); - } - if (codec != "null") { - throw std::runtime_error("Unsupported Avro codec: " + codec); - } - m_syncMarker = fileHeader.Field("sync").Value(); - m_objectSchema = std::make_unique(ParseSchemaFromJsonString(objectSchemaJson)); - schema = m_objectSchema.get(); - } - - if (m_remainingObjectInCurrentBlock == 0) { - m_reader->Discard(); - m_remainingObjectInCurrentBlock = m_reader->ParseInt(context); - int64_t ObjectsSize = m_reader->ParseInt(context); - m_reader->Preload(static_cast(ObjectsSize), context); - } - - auto objectDatum = AvroDatum(*m_objectSchema); - objectDatum.Fill(*m_reader, context); - if (--m_remainingObjectInCurrentBlock == 0) { - auto markerDatum = AvroDatum(SyncMarkerSchema); - markerDatum.Fill(*m_reader, context); - auto marker = markerDatum.Value(); - if (marker != m_syncMarker) { - throw std::runtime_error("Sync marker doesn't match."); - } - m_eof = m_reader->TryPreload(1, context) == 0; - } - return objectDatum; -} - -size_t AvroStreamParser::OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) { - if (m_parserBuffer.Length != 0) { - size_t bytesToCopy = (std::min)(m_parserBuffer.Length, count); - std::memcpy(buffer, m_parserBuffer.Data, bytesToCopy); - m_parserBuffer.Data += bytesToCopy; - m_parserBuffer.Length -= bytesToCopy; - return bytesToCopy; - } - while (!m_parser.End()) { - auto datum = m_parser.Next(context); - if (datum.Schema().Type() == AvroDatumType::Union) { - datum = datum.Value(); - } - if (datum.Schema().Type() != AvroDatumType::Record) { - continue; - } - if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.resultData") { - auto record = datum.Value(); - auto dataDatum = record.Field("data"); - m_parserBuffer = dataDatum.Value(); - return OnRead(buffer, count, context); - } - if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.progress" && m_progressCallback) { - auto record = datum.Value(); - auto bytesScanned = record.Field("bytesScanned").Value(); - auto totalBytes = record.Field("totalBytes").Value(); - m_progressCallback(bytesScanned, totalBytes); - } - if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.error" && m_errorCallback) { - auto record = datum.Value(); - BlobQueryError e; - e.Name = record.Field("name").Value(); - e.Description = record.Field("description").Value(); - e.IsFatal = record.Field("fatal").Value(); - e.Position = record.Field("position").Value(); - m_errorCallback(std::move(e)); - } - } - return 0; -} -} // namespace _detail -} // namespace Blobs -} // namespace Storage -} // namespace Azure - -#endif diff --git a/source/libs/azure/src/avro_parser.hpp b/source/libs/azure/src/avro_parser.hpp deleted file mode 100644 index 275d073c85..0000000000 --- a/source/libs/azure/src/avro_parser.hpp +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -#pragma once - -#include "azure/storage/blobs/blob_options.hpp" - -#include - -#include -#include -#include - -namespace Azure { namespace Storage { namespace Blobs { namespace _detail { - enum class AvroDatumType - { - String, - Bytes, - Int, - Long, - Float, - Double, - Bool, - Null, - Record, - Enum, - Array, - Map, - Union, - Fixed, - }; - - class AvroStreamReader final { - public: - // position of a vector that lives through vector resizing - struct ReaderPos final - { - const std::vector* BufferPtr = nullptr; - size_t Offset = 0; - }; - explicit AvroStreamReader(Core::IO::BodyStream& stream) - : m_stream(&stream), m_pos{&m_streambuffer, 0} - { - } - AvroStreamReader(const AvroStreamReader&) = delete; - AvroStreamReader& operator=(const AvroStreamReader&) = delete; - - int64_t ParseInt(const Core::Context& context); - void Advance(size_t n, const Core::Context& context); - // Read at least n bytes from m_stream and append data to m_streambuffer. Return number of bytes - // available in m_streambuffer; - size_t Preload(size_t n, const Core::Context& context); - size_t TryPreload(size_t n, const Core::Context& context); - // discards data that's before m_pos - void Discard(); - - private: - size_t AvailableBytes() const { return m_streambuffer.size() - m_pos.Offset; } - - private: - Core::IO::BodyStream* m_stream; - std::vector m_streambuffer; - ReaderPos m_pos; - - friend class AvroDatum; - }; - - class AvroSchema final { - public: - static const AvroSchema StringSchema; - static const AvroSchema BytesSchema; - static const AvroSchema IntSchema; - static const AvroSchema LongSchema; - static const AvroSchema FloatSchema; - static const AvroSchema DoubleSchema; - static const AvroSchema BoolSchema; - static const AvroSchema NullSchema; - static AvroSchema RecordSchema( - std::string name, - const std::vector>& fieldsSchema); - static AvroSchema ArraySchema(AvroSchema elementSchema); - static AvroSchema MapSchema(AvroSchema elementSchema); - static AvroSchema UnionSchema(std::vector schemas); - static AvroSchema FixedSchema(std::string name, int64_t size); - - const std::string& Name() const { return m_name; } - AvroDatumType Type() const { return m_type; } - const std::vector& FieldNames() const { return m_status->m_keys; } - AvroSchema ItemSchema() const { return m_status->m_schemas[0]; } - const std::vector& FieldSchemas() const { return m_status->m_schemas; } - size_t Size() const { return static_cast(m_status->m_size); } - - private: - explicit AvroSchema(AvroDatumType type) : m_type(type) {} - - private: - AvroDatumType m_type; - std::string m_name; - - struct SharedStatus - { - std::vector m_keys; - std::vector m_schemas; - int64_t m_size = 0; - }; - std::shared_ptr m_status; - }; - - class AvroDatum final { - public: - AvroDatum() : m_schema(AvroSchema::NullSchema) {} - explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {} - - void Fill(AvroStreamReader& reader, const Core::Context& context); - void Fill(AvroStreamReader::ReaderPos& data); - - const AvroSchema& Schema() const { return m_schema; } - - template T Value() const; - struct StringView - { - const uint8_t* Data = nullptr; - size_t Length = 0; - }; - - private: - AvroSchema m_schema; - AvroStreamReader::ReaderPos m_data; - }; - - using AvroMap = std::map; - - class AvroRecord final { - public: - bool HasField(const std::string& key) const { return FindField(key) != m_keys->size(); } - const AvroDatum& Field(const std::string& key) const { return m_values.at(FindField(key)); } - AvroDatum& Field(const std::string& key) { return m_values.at(FindField(key)); } - const AvroDatum& FieldAt(size_t i) const { return m_values.at(i); } - AvroDatum& FieldAt(size_t i) { return m_values.at(i); } - - private: - size_t FindField(const std::string& key) const - { - auto i = find(m_keys->begin(), m_keys->end(), key); - return i - m_keys->begin(); - } - const std::vector* m_keys = nullptr; - std::vector m_values; - - friend class AvroDatum; - }; - - class AvroObjectContainerReader final { - public: - explicit AvroObjectContainerReader(Core::IO::BodyStream& stream); - - bool End() const { return m_eof; } - // Calling Next() will invalidates the previous AvroDatum returned by this function and all - // AvroDatums propagated from there. - AvroDatum Next(const Core::Context& context) { return NextImpl(m_objectSchema.get(), context); } - - private: - AvroDatum NextImpl(const AvroSchema* schema, const Core::Context& context); - - private: - std::unique_ptr m_reader; - std::unique_ptr m_objectSchema; - std::string m_syncMarker; - int64_t m_remainingObjectInCurrentBlock = 0; - bool m_eof = false; - }; - - class AvroStreamParser final : public Core::IO::BodyStream { - public: - explicit AvroStreamParser( - std::unique_ptr inner, - std::function progressCallback, - std::function errorCallback) - : m_inner(std::move(inner)), m_parser(*m_inner), - m_progressCallback(std::move(progressCallback)), m_errorCallback(std::move(errorCallback)) - { - } - - int64_t Length() const override { return -1; } - void Rewind() override { this->m_inner->Rewind(); } - - private: - size_t OnRead(uint8_t* buffer, size_t count, const Azure::Core::Context& context) override; - - private: - std::unique_ptr m_inner; - AvroObjectContainerReader m_parser; - std::function m_progressCallback; - std::function m_errorCallback; - AvroDatum::StringView m_parserBuffer; - }; - -}}}} // namespace Azure::Storage::Blobs::_detail diff --git a/source/libs/azure/src/td_avro_parser.cpp b/source/libs/azure/src/td_avro_parser.cpp index 485980e007..62bd3a8151 100644 --- a/source/libs/azure/src/td_avro_parser.cpp +++ b/source/libs/azure/src/td_avro_parser.cpp @@ -1,5 +1,5 @@ #if defined(USE_S3) -#include "avro_parser.hpp" +#include #include #include diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp index b5a5c3c189..33ac774d0c 100644 --- a/source/libs/azure/src/td_block_blob_client.cpp +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -14,7 +14,7 @@ #include #endif -#include "avro_parser.hpp" +#include #include #include