From 88d730f7eda456f7be8c7803edcd66f8514045f8 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 15 Oct 2024 12:47:10 +0800 Subject: [PATCH] az: clean up avro --- source/libs/azure/inc/td_avro_parser.h | 191 +++++++ source/libs/azure/src/td_avro_parser.cpp | 531 ++++++++++++++++++ .../libs/azure/src/td_block_blob_client.cpp | 2 +- 3 files changed, 723 insertions(+), 1 deletion(-) create mode 100644 source/libs/azure/inc/td_avro_parser.h create mode 100644 source/libs/azure/src/td_avro_parser.cpp diff --git a/source/libs/azure/inc/td_avro_parser.h b/source/libs/azure/inc/td_avro_parser.h new file mode 100644 index 0000000000..dae5a65dc7 --- /dev/null +++ b/source/libs/azure/inc/td_avro_parser.h @@ -0,0 +1,191 @@ +#pragma once + +#include "azure/storage/blobs/blob_options.hpp" + +#include + +#include +#include +#include + +namespace Azure { +namespace Storage { +namespace Blobs { +namespace _detail { +enum class AvroDatumType { + String, + Bytes, + Int, + Long, + Float, + Double, + Bool, + Null, + Record, + Enum, + Array, + Map, + Union, + Fixed, +}; + +class AvroStreamReader final { + public: + // position of a vector that lives through vector resizing + struct ReaderPos final { + const std::vector* BufferPtr = nullptr; + size_t Offset = 0; + }; + explicit AvroStreamReader(Core::IO::BodyStream& stream) : m_stream(&stream), m_pos{&m_streambuffer, 0} {} + AvroStreamReader(const AvroStreamReader&) = delete; + AvroStreamReader& operator=(const AvroStreamReader&) = delete; + + int64_t ParseInt(const Core::Context& context); + void Advance(size_t n, const Core::Context& context); + // Read at least n bytes from m_stream and append data to m_streambuffer. Return number of bytes + // available in m_streambuffer; + size_t Preload(size_t n, const Core::Context& context); + size_t TryPreload(size_t n, const Core::Context& context); + // discards data that's before m_pos + void Discard(); + + private: + size_t AvailableBytes() const { return m_streambuffer.size() - m_pos.Offset; } + + private: + Core::IO::BodyStream* m_stream; + std::vector m_streambuffer; + ReaderPos m_pos; + + friend class AvroDatum; +}; + +class AvroSchema final { + public: + static const AvroSchema StringSchema; + static const AvroSchema BytesSchema; + static const AvroSchema IntSchema; + static const AvroSchema LongSchema; + static const AvroSchema FloatSchema; + static const AvroSchema DoubleSchema; + static const AvroSchema BoolSchema; + static const AvroSchema NullSchema; + static AvroSchema RecordSchema(std::string name, const std::vector>& fieldsSchema); + static AvroSchema ArraySchema(AvroSchema elementSchema); + static AvroSchema MapSchema(AvroSchema elementSchema); + static AvroSchema UnionSchema(std::vector schemas); + static AvroSchema FixedSchema(std::string name, int64_t size); + + const std::string& Name() const { return m_name; } + AvroDatumType Type() const { return m_type; } + const std::vector& FieldNames() const { return m_status->m_keys; } + AvroSchema ItemSchema() const { return m_status->m_schemas[0]; } + const std::vector& FieldSchemas() const { return m_status->m_schemas; } + size_t Size() const { return static_cast(m_status->m_size); } + + private: + explicit AvroSchema(AvroDatumType type) : m_type(type) {} + + private: + AvroDatumType m_type; + std::string m_name; + + struct SharedStatus { + std::vector m_keys; + std::vector m_schemas; + int64_t m_size = 0; + }; + std::shared_ptr m_status; +}; + +class AvroDatum final { + public: + AvroDatum() : m_schema(AvroSchema::NullSchema) {} + explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {} + + void Fill(AvroStreamReader& reader, const Core::Context& context); + void Fill(AvroStreamReader::ReaderPos& data); + + const AvroSchema& Schema() const { return m_schema; } + + template + T Value() const; + struct StringView { + const uint8_t* Data = nullptr; + size_t Length = 0; + }; + + private: + AvroSchema m_schema; + AvroStreamReader::ReaderPos m_data; +}; + +using AvroMap = std::map; + +class AvroRecord final { + public: + bool HasField(const std::string& key) const { return FindField(key) != m_keys->size(); } + const AvroDatum& Field(const std::string& key) const { return m_values.at(FindField(key)); } + AvroDatum& Field(const std::string& key) { return m_values.at(FindField(key)); } + const AvroDatum& FieldAt(size_t i) const { return m_values.at(i); } + AvroDatum& FieldAt(size_t i) { return m_values.at(i); } + + private: + size_t FindField(const std::string& key) const { + auto i = find(m_keys->begin(), m_keys->end(), key); + return i - m_keys->begin(); + } + const std::vector* m_keys = nullptr; + std::vector m_values; + + friend class AvroDatum; +}; + +class AvroObjectContainerReader final { + public: + explicit AvroObjectContainerReader(Core::IO::BodyStream& stream); + + bool End() const { return m_eof; } + // Calling Next() will invalidates the previous AvroDatum returned by this function and all + // AvroDatums propagated from there. + AvroDatum Next(const Core::Context& context) { return NextImpl(m_objectSchema.get(), context); } + + private: + AvroDatum NextImpl(const AvroSchema* schema, const Core::Context& context); + + private: + std::unique_ptr m_reader; + std::unique_ptr m_objectSchema; + std::string m_syncMarker; + int64_t m_remainingObjectInCurrentBlock = 0; + bool m_eof = false; +}; + +class AvroStreamParser final : public Core::IO::BodyStream { + public: + explicit AvroStreamParser(std::unique_ptr inner, + std::function progressCallback, + std::function errorCallback) + : m_inner(std::move(inner)), + m_parser(*m_inner), + m_progressCallback(std::move(progressCallback)), + m_errorCallback(std::move(errorCallback)) {} + + int64_t Length() const override { return -1; } + void Rewind() override { this->m_inner->Rewind(); } + + private: + size_t OnRead(uint8_t* buffer, size_t count, const Azure::Core::Context& context) override; + + private: + std::unique_ptr m_inner; + AvroObjectContainerReader m_parser; + std::function m_progressCallback; + std::function m_errorCallback; + AvroDatum::StringView m_parserBuffer; +}; + +} // namespace _detail +} // namespace Blobs +} // namespace Storage +} // namespace Azure diff --git a/source/libs/azure/src/td_avro_parser.cpp b/source/libs/azure/src/td_avro_parser.cpp new file mode 100644 index 0000000000..485980e007 --- /dev/null +++ b/source/libs/azure/src/td_avro_parser.cpp @@ -0,0 +1,531 @@ +#if defined(USE_S3) +#include "avro_parser.hpp" + +#include +#include + +#include +#include + +namespace Azure { +namespace Storage { +namespace Blobs { +namespace _detail { + +namespace { +int64_t parseInt(AvroStreamReader::ReaderPos& data) { + uint64_t r = 0; + int nb = 0; + while (true) { + uint8_t c = (*data.BufferPtr)[data.Offset++]; + r = r | ((static_cast(c) & 0x7f) << (nb * 7)); + if (c & 0x80) { + ++nb; + continue; + } + break; + } + return static_cast(r >> 1) ^ -static_cast(r & 0x01); +} + +AvroSchema ParseSchemaFromJsonString(const std::string& jsonSchema) { + const static std::map BuiltinNameSchemaMap = { + {"string", AvroSchema::StringSchema}, {"bytes", AvroSchema::BytesSchema}, {"int", AvroSchema::IntSchema}, + {"long", AvroSchema::LongSchema}, {"float", AvroSchema::FloatSchema}, {"double", AvroSchema::DoubleSchema}, + {"boolean", AvroSchema::BoolSchema}, {"null", AvroSchema::NullSchema}, {"string", AvroSchema::StringSchema}, + }; + std::map nameSchemaMap = BuiltinNameSchemaMap; + + std::function parseSchemaFromJsonObject; + parseSchemaFromJsonObject = [&](const Core::Json::_internal::json& obj) -> AvroSchema { + if (obj.is_string()) { + auto typeName = obj.get(); + return nameSchemaMap.find(typeName)->second; + } else if (obj.is_array()) { + std::vector unionSchemas; + for (const auto& s : obj) { + unionSchemas.push_back(parseSchemaFromJsonObject(s)); + } + return AvroSchema::UnionSchema(std::move(unionSchemas)); + } else if (obj.is_object()) { + if (obj.count("namespace") != 0) { + throw std::runtime_error("Namespace isn't supported yet in Avro schema."); + } + if (obj.count("aliases") != 0) { + throw std::runtime_error("Alias isn't supported yet in Avro schema."); + } + auto typeName = obj["type"].get(); + auto i = nameSchemaMap.find(typeName); + if (i != nameSchemaMap.end()) { + return i->second; + } + if (typeName == "record") { + std::vector> fieldsSchema; + for (const auto& field : obj["fields"]) { + fieldsSchema.push_back( + std::make_pair(field["name"].get(), parseSchemaFromJsonObject(field["type"]))); + } + + const std::string recordName = obj["name"].get(); + auto recordSchema = AvroSchema::RecordSchema(recordName, std::move(fieldsSchema)); + nameSchemaMap.insert(std::make_pair(recordName, recordSchema)); + return recordSchema; + } else if (typeName == "enum") { + throw std::runtime_error("Enum type isn't supported yet in Avro schema."); + } else if (typeName == "array") { + return AvroSchema::ArraySchema(parseSchemaFromJsonObject(obj["items"])); + } else if (typeName == "map") { + return AvroSchema::MapSchema(parseSchemaFromJsonObject(obj["items"])); + } else if (typeName == "fixed") { + const std::string fixedName = obj["name"].get(); + auto fixedSchema = AvroSchema::FixedSchema(fixedName, obj["size"].get()); + nameSchemaMap.insert(std::make_pair(fixedName, fixedSchema)); + return fixedSchema; + } else { + throw std::runtime_error("Unrecognized type " + typeName + " in Avro schema."); + } + } + AZURE_UNREACHABLE_CODE(); + }; + + auto jsonRoot = Core::Json::_internal::json::parse(jsonSchema.begin(), jsonSchema.end()); + return parseSchemaFromJsonObject(jsonRoot); +} +} // namespace + +int64_t AvroStreamReader::ParseInt(const Core::Context& context) { + uint64_t r = 0; + int nb = 0; + while (true) { + Preload(1, context); + uint8_t c = m_streambuffer[m_pos.Offset++]; + + r = r | ((static_cast(c) & 0x7f) << (nb * 7)); + if (c & 0x80) { + ++nb; + continue; + } + break; + } + return static_cast(r >> 1) ^ -static_cast(r & 0x01); +} + +void AvroStreamReader::Advance(size_t n, const Core::Context& context) { + Preload(n, context); + m_pos.Offset += n; +} + +size_t AvroStreamReader::Preload(size_t n, const Core::Context& context) { + size_t oldAvailable = AvailableBytes(); + while (true) { + size_t newAvailable = TryPreload(n, context); + if (newAvailable >= n) { + return newAvailable; + } + if (oldAvailable == newAvailable) { + throw std::runtime_error("Unexpected EOF of Avro stream."); + } + oldAvailable = newAvailable; + } + AZURE_UNREACHABLE_CODE(); +} + +size_t AvroStreamReader::TryPreload(size_t n, const Core::Context& context) { + size_t availableBytes = AvailableBytes(); + if (availableBytes >= n) { + return availableBytes; + } + const size_t MinRead = 4096; + size_t tryReadSize = (std::max)(n, MinRead); + size_t currSize = m_streambuffer.size(); + m_streambuffer.resize(m_streambuffer.size() + tryReadSize); + size_t actualReadSize = m_stream->Read(m_streambuffer.data() + currSize, tryReadSize, context); + m_streambuffer.resize(currSize + actualReadSize); + return AvailableBytes(); +} + +void AvroStreamReader::Discard() { + constexpr size_t MinimumReleaseMemory = 128 * 1024; + if (m_pos.Offset < MinimumReleaseMemory) { + return; + } + const size_t availableBytes = AvailableBytes(); + std::memmove(&m_streambuffer[0], &m_streambuffer[m_pos.Offset], availableBytes); + m_streambuffer.resize(availableBytes); + m_pos.Offset = 0; +} + +const AvroSchema AvroSchema::StringSchema(AvroDatumType::String); +const AvroSchema AvroSchema::BytesSchema(AvroDatumType::Bytes); +const AvroSchema AvroSchema::IntSchema(AvroDatumType::Int); +const AvroSchema AvroSchema::LongSchema(AvroDatumType::Long); +const AvroSchema AvroSchema::FloatSchema(AvroDatumType::Float); +const AvroSchema AvroSchema::DoubleSchema(AvroDatumType::Double); +const AvroSchema AvroSchema::BoolSchema(AvroDatumType::Bool); +const AvroSchema AvroSchema::NullSchema(AvroDatumType::Null); + +AvroSchema AvroSchema::RecordSchema(std::string name, + const std::vector>& fieldsSchema) { + AvroSchema recordSchema(AvroDatumType::Record); + recordSchema.m_name = std::move(name); + recordSchema.m_status = std::make_shared(); + for (auto& i : fieldsSchema) { + recordSchema.m_status->m_keys.push_back(i.first); + recordSchema.m_status->m_schemas.push_back(i.second); + } + return recordSchema; +} + +AvroSchema AvroSchema::ArraySchema(AvroSchema elementSchema) { + AvroSchema arraySchema(AvroDatumType::Array); + arraySchema.m_status = std::make_shared(); + arraySchema.m_status->m_schemas.push_back(std::move(elementSchema)); + return arraySchema; +} + +AvroSchema AvroSchema::MapSchema(AvroSchema elementSchema) { + AvroSchema mapSchema(AvroDatumType::Map); + mapSchema.m_status = std::make_shared(); + mapSchema.m_status->m_schemas.push_back(std::move(elementSchema)); + return mapSchema; +} + +AvroSchema AvroSchema::UnionSchema(std::vector schemas) { + AvroSchema unionSchema(AvroDatumType::Union); + unionSchema.m_status = std::make_shared(); + unionSchema.m_status->m_schemas = std::move(schemas); + return unionSchema; +} + +AvroSchema AvroSchema::FixedSchema(std::string name, int64_t size) { + AvroSchema fixedSchema(AvroDatumType::Fixed); + fixedSchema.m_name = std::move(name); + fixedSchema.m_status = std::make_shared(); + fixedSchema.m_status->m_size = size; + return fixedSchema; +} + +void AvroDatum::Fill(AvroStreamReader& reader, const Core::Context& context) { + m_data = reader.m_pos; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + int64_t stringSize = reader.ParseInt(context); + reader.Advance(static_cast(stringSize), context); + } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || + m_schema.Type() == AvroDatumType::Enum) { + reader.ParseInt(context); + } else if (m_schema.Type() == AvroDatumType::Float) { + reader.Advance(4, context); + } else if (m_schema.Type() == AvroDatumType::Double) { + reader.Advance(8, context); + } else if (m_schema.Type() == AvroDatumType::Bool) { + reader.Advance(1, context); + } else if (m_schema.Type() == AvroDatumType::Null) { + reader.Advance(0, context); + } else if (m_schema.Type() == AvroDatumType::Record) { + for (const auto& s : m_schema.FieldSchemas()) { + AvroDatum(s).Fill(reader, context); + } + } else if (m_schema.Type() == AvroDatumType::Array) { + while (true) { + int64_t numElementsInBlock = reader.ParseInt(context); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = reader.ParseInt(context); + reader.Advance(static_cast(blockSize), context); + } else { + for (auto i = 0; i < numElementsInBlock; ++i) { + AvroDatum(m_schema.ItemSchema()).Fill(reader, context); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Map) { + while (true) { + int64_t numElementsInBlock = reader.ParseInt(context); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = reader.ParseInt(context); + reader.Advance(static_cast(blockSize), context); + } else { + for (int64_t i = 0; i < numElementsInBlock; ++i) { + AvroDatum(AvroSchema::StringSchema).Fill(reader, context); + AvroDatum(m_schema.ItemSchema()).Fill(reader, context); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = reader.ParseInt(context); + AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(reader, context); + } else if (m_schema.Type() == AvroDatumType::Fixed) { + reader.Advance(m_schema.Size(), context); + } else { + AZURE_UNREACHABLE_CODE(); + } +} + +void AvroDatum::Fill(AvroStreamReader::ReaderPos& data) { + m_data = data; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + int64_t stringSize = parseInt(data); + data.Offset += static_cast(stringSize); + } else if (m_schema.Type() == AvroDatumType::Int || m_schema.Type() == AvroDatumType::Long || + m_schema.Type() == AvroDatumType::Enum) { + parseInt(data); + } else if (m_schema.Type() == AvroDatumType::Float) { + data.Offset += 4; + } else if (m_schema.Type() == AvroDatumType::Double) { + data.Offset += 8; + } else if (m_schema.Type() == AvroDatumType::Bool) { + data.Offset += 1; + } else if (m_schema.Type() == AvroDatumType::Null) { + data.Offset += 0; + } else if (m_schema.Type() == AvroDatumType::Record) { + for (const auto& s : m_schema.FieldSchemas()) { + AvroDatum(s).Fill(data); + } + } else if (m_schema.Type() == AvroDatumType::Array) { + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = parseInt(data); + data.Offset += static_cast(blockSize); + } else { + for (auto i = 0; i < numElementsInBlock; ++i) { + AvroDatum(m_schema.ItemSchema()).Fill(data); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Map) { + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } else if (numElementsInBlock < 0) { + int64_t blockSize = parseInt(data); + data.Offset += static_cast(blockSize); + } else { + for (int64_t i = 0; i < numElementsInBlock; ++i) { + AvroDatum(AvroSchema::StringSchema).Fill(data); + AvroDatum(m_schema.ItemSchema()).Fill(data); + } + } + } + } else if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = parseInt(data); + AvroDatum(m_schema.FieldSchemas()[static_cast(i)]).Fill(data); + } else if (m_schema.Type() == AvroDatumType::Fixed) { + data.Offset += m_schema.Size(); + } else { + AZURE_UNREACHABLE_CODE(); + } +} + +template <> +AvroDatum::StringView AvroDatum::Value() const { + auto data = m_data; + if (m_schema.Type() == AvroDatumType::String || m_schema.Type() == AvroDatumType::Bytes) { + const int64_t length = parseInt(data); + const uint8_t* start = &(*data.BufferPtr)[data.Offset]; + StringView ret{start, static_cast(length)}; + data.Offset += static_cast(length); + return ret; + } + if (m_schema.Type() == AvroDatumType::Fixed) { + const size_t fixedSize = m_schema.Size(); + const uint8_t* start = &(*data.BufferPtr)[data.Offset]; + StringView ret{start, fixedSize}; + data.Offset += fixedSize; + return ret; + } + AZURE_UNREACHABLE_CODE(); +} + +template <> +std::string AvroDatum::Value() const { + auto stringView = Value(); + return std::string(stringView.Data, stringView.Data + stringView.Length); +} + +template <> +std::vector AvroDatum::Value() const { + auto stringView = Value(); + return std::vector(stringView.Data, stringView.Data + stringView.Length); +} + +template <> +int64_t AvroDatum::Value() const { + auto data = m_data; + return parseInt(data); +} + +template <> +int32_t AvroDatum::Value() const { + return static_cast(Value()); +} + +template <> +bool AvroDatum::Value() const { + return Value(); +} + +template <> +std::nullptr_t AvroDatum::Value() const { + return nullptr; +} + +template <> +AvroRecord AvroDatum::Value() const { + auto data = m_data; + + AvroRecord r; + r.m_keys = &m_schema.FieldNames(); + for (const auto& schema : m_schema.FieldSchemas()) { + auto datum = AvroDatum(schema); + datum.Fill(data); + r.m_values.push_back(std::move(datum)); + } + + return r; +} + +template <> +AvroMap AvroDatum::Value() const { + auto data = m_data; + + AvroMap m; + while (true) { + int64_t numElementsInBlock = parseInt(data); + if (numElementsInBlock == 0) { + break; + } + if (numElementsInBlock < 0) { + numElementsInBlock = -numElementsInBlock; + parseInt(data); + } + for (int64_t i = 0; i < numElementsInBlock; ++i) { + auto keyDatum = AvroDatum(AvroSchema::StringSchema); + keyDatum.Fill(data); + auto valueDatum = AvroDatum(m_schema.ItemSchema()); + valueDatum.Fill(data); + m[keyDatum.Value()] = valueDatum; + } + } + return m; +} + +template <> +AvroDatum AvroDatum::Value() const { + auto data = m_data; + if (m_schema.Type() == AvroDatumType::Union) { + int64_t i = parseInt(data); + auto datum = AvroDatum(m_schema.FieldSchemas()[static_cast(i)]); + datum.Fill(data); + return datum; + } + AZURE_UNREACHABLE_CODE(); +} + +AvroObjectContainerReader::AvroObjectContainerReader(Core::IO::BodyStream& stream) + : m_reader(std::make_unique(stream)) {} + +AvroDatum AvroObjectContainerReader::NextImpl(const AvroSchema* schema, const Core::Context& context) { + AZURE_ASSERT_FALSE(m_eof); + static const auto SyncMarkerSchema = AvroSchema::FixedSchema("Sync", 16); + if (!schema) { + static AvroSchema FileHeaderSchema = []() { + std::vector> fieldsSchema; + fieldsSchema.push_back(std::make_pair("magic", AvroSchema::FixedSchema("Magic", 4))); + fieldsSchema.push_back(std::make_pair("meta", AvroSchema::MapSchema(AvroSchema::BytesSchema))); + fieldsSchema.push_back(std::make_pair("sync", SyncMarkerSchema)); + return AvroSchema::RecordSchema("org.apache.avro.file.Header", std::move(fieldsSchema)); + }(); + auto fileHeaderDatum = AvroDatum(FileHeaderSchema); + fileHeaderDatum.Fill(*m_reader, context); + auto fileHeader = fileHeaderDatum.Value(); + if (fileHeader.Field("magic").Value() != "Obj\01") { + throw std::runtime_error("Invalid Avro object container magic."); + } + AvroMap meta = fileHeader.Field("meta").Value(); + std::string objectSchemaJson = meta["avro.schema"].Value(); + std::string codec = "null"; + if (meta.count("avro.codec") != 0) { + codec = meta["avro.codec"].Value(); + } + if (codec != "null") { + throw std::runtime_error("Unsupported Avro codec: " + codec); + } + m_syncMarker = fileHeader.Field("sync").Value(); + m_objectSchema = std::make_unique(ParseSchemaFromJsonString(objectSchemaJson)); + schema = m_objectSchema.get(); + } + + if (m_remainingObjectInCurrentBlock == 0) { + m_reader->Discard(); + m_remainingObjectInCurrentBlock = m_reader->ParseInt(context); + int64_t ObjectsSize = m_reader->ParseInt(context); + m_reader->Preload(static_cast(ObjectsSize), context); + } + + auto objectDatum = AvroDatum(*m_objectSchema); + objectDatum.Fill(*m_reader, context); + if (--m_remainingObjectInCurrentBlock == 0) { + auto markerDatum = AvroDatum(SyncMarkerSchema); + markerDatum.Fill(*m_reader, context); + auto marker = markerDatum.Value(); + if (marker != m_syncMarker) { + throw std::runtime_error("Sync marker doesn't match."); + } + m_eof = m_reader->TryPreload(1, context) == 0; + } + return objectDatum; +} + +size_t AvroStreamParser::OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) { + if (m_parserBuffer.Length != 0) { + size_t bytesToCopy = (std::min)(m_parserBuffer.Length, count); + std::memcpy(buffer, m_parserBuffer.Data, bytesToCopy); + m_parserBuffer.Data += bytesToCopy; + m_parserBuffer.Length -= bytesToCopy; + return bytesToCopy; + } + while (!m_parser.End()) { + auto datum = m_parser.Next(context); + if (datum.Schema().Type() == AvroDatumType::Union) { + datum = datum.Value(); + } + if (datum.Schema().Type() != AvroDatumType::Record) { + continue; + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.resultData") { + auto record = datum.Value(); + auto dataDatum = record.Field("data"); + m_parserBuffer = dataDatum.Value(); + return OnRead(buffer, count, context); + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.progress" && m_progressCallback) { + auto record = datum.Value(); + auto bytesScanned = record.Field("bytesScanned").Value(); + auto totalBytes = record.Field("totalBytes").Value(); + m_progressCallback(bytesScanned, totalBytes); + } + if (datum.Schema().Name() == "com.microsoft.azure.storage.queryBlobContents.error" && m_errorCallback) { + auto record = datum.Value(); + BlobQueryError e; + e.Name = record.Field("name").Value(); + e.Description = record.Field("description").Value(); + e.IsFatal = record.Field("fatal").Value(); + e.Position = record.Field("position").Value(); + m_errorCallback(std::move(e)); + } + } + return 0; +} +} // namespace _detail +} // namespace Blobs +} // namespace Storage +} // namespace Azure + +#endif diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp index e75c6ae17f..b5a5c3c189 100644 --- a/source/libs/azure/src/td_block_blob_client.cpp +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -14,7 +14,7 @@ #include #endif -#include "./avro_parser.hpp" +#include "avro_parser.hpp" #include #include