Merge pull request #23223 from taosdata/fix/TD-26189-libs3

enh(tsdb/cos): new sdk for s3
This commit is contained in:
Hongze Cheng 2023-10-24 01:18:41 -05:00 committed by GitHub
commit 56bc18bbd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1372 additions and 23 deletions

View File

@ -127,6 +127,22 @@ option(
IF(${TD_LINUX})
option(
BUILD_WITH_S3
"If build with s3"
ON
)
IF(${BUILD_WITH_S3})
option(
BUILD_WITH_COS
"If build with cos"
OFF
)
ELSE ()
option(
BUILD_WITH_COS
"If build with cos"
@ -135,6 +151,8 @@ option(
ENDIF ()
ENDIF ()
option(
BUILD_WITH_SQLITE
"If build with sqlite"

View File

@ -1,18 +1,18 @@
# curl
ExternalProject_Add(curl
ExternalProject_Add(curl2
URL https://curl.se/download/curl-8.2.1.tar.gz
URL_HASH MD5=b25588a43556068be05e1624e0e74d41
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
#GIT_REPOSITORY https://github.com/curl/curl.git
#GIT_TAG curl-7_88_1
SOURCE_DIR "${TD_CONTRIB_DIR}/curl"
SOURCE_DIR "${TD_CONTRIB_DIR}/curl2"
DEPENDS openssl
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd
#CONFIGURE_COMMAND ./configure --without-ssl
BUILD_COMMAND make
UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 #--enable-debug
BUILD_COMMAND make -j
INSTALL_COMMAND make install
TEST_COMMAND ""
)

430
cmake/libs3.GNUmakefile Normal file
View File

@ -0,0 +1,430 @@
# GNUmakefile
#
# Copyright 2008 Bryan Ischo <bryan@ischo.com>
#
# This file is part of libs3.
#
# libs3 is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, version 3 or above of the License. You can also
# redistribute and/or modify it under the terms of the GNU General Public
# License, version 2 or above of the License.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of this library and its programs with the
# OpenSSL library, and distribute linked combinations including the two.
#
# libs3 is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# version 3 along with libs3, in a file named COPYING. If not, see
# <http://www.gnu.org/licenses/>.
#
# You should also have received a copy of the GNU General Public License
# version 2 along with libs3, in a file named COPYING-GPLv2. If not, see
# <http://www.gnu.org/licenses/>.
# I tried to use the autoconf/automake/autolocal/etc (i.e. autohell) tools
# but I just couldn't stomach them. Since this is a Makefile for POSIX
# systems, I will simply do away with autohell completely and use a GNU
# Makefile. GNU make ought to be available pretty much everywhere, so I
# don't see this being a significant issue for portability.
# All commands assume a GNU compiler. For systems which do not use a GNU
# compiler, write scripts with the same names as these commands, and taking
# the same arguments, and translate the arguments and commands into the
# appropriate non-POSIX ones as needed. libs3 assumes a GNU toolchain as
# the most portable way to build software possible. Non-POSIX, non-GNU
# systems can do the work of supporting this build infrastructure.
# --------------------------------------------------------------------------
# Set libs3 version number, unless it is already set.
LIBS3_VER_MAJOR ?= 4
LIBS3_VER_MINOR ?= 1
LIBS3_VER := $(LIBS3_VER_MAJOR).$(LIBS3_VER_MINOR)
# -----------------------------------------------------------------------------
# Determine verbosity. VERBOSE_SHOW should be prepended to every command which
# should only be displayed if VERBOSE is set. QUIET_ECHO may be used to
# echo text only if VERBOSE is not set. Typically, a VERBOSE_SHOW command will
# be paired with a QUIET_ECHO command, to provide a command which is displayed
# in VERBOSE mode, along with text which is displayed in non-VERBOSE mode to
# describe the command.
#
# No matter what VERBOSE is defined to, it ends up as true if it's defined.
# This will be weird if you defined VERBOSE=false in the environment, and we
# switch it to true here; but the meaning of VERBOSE is, "if it's defined to
# any value, then verbosity is turned on". So don't define VERBOSE if you
# don't want verbosity in the build process.
# -----------------------------------------------------------------------------
ifdef VERBOSE
VERBOSE = true
VERBOSE_ECHO = @ echo
VERBOSE_SHOW =
QUIET_ECHO = @ echo > /dev/null
else
VERBOSE = false
VERBOSE_ECHO = @ echo > /dev/null
VERBOSE_SHOW = @
QUIET_ECHO = @ echo
endif
# --------------------------------------------------------------------------
# BUILD directory
ifndef BUILD
ifdef DEBUG
BUILD := build-debug
else
BUILD := build
endif
endif
# --------------------------------------------------------------------------
# DESTDIR directory
ifndef DESTDIR
DESTDIR := ${HOME}/.cos-local.2
endif
# --------------------------------------------------------------------------
# LIBDIR directory
ifndef LIBDIR
LIBDIR := ${DESTDIR}/lib
endif
# --------------------------------------------------------------------------
# Compiler CC handling
ifndef CC
CC := gcc
endif
# --------------------------------------------------------------------------
# Acquire configuration information for libraries that libs3 depends upon
ifndef CURL_LIBS
CURL_LIBS := $(shell curl-config --libs)
endif
ifndef CURL_CFLAGS
CURL_CFLAGS := $(shell curl-config --cflags)
endif
ifndef LIBXML2_LIBS
LIBXML2_LIBS := $(shell xml2-config --libs)
endif
ifndef LIBXML2_CFLAGS
LIBXML2_CFLAGS := $(shell xml2-config --cflags)
endif
ifndef OPENSSL_LIBS
OPENSSL_LIBS := -lssl -lcrypto
endif
# --------------------------------------------------------------------------
# These CFLAGS assume a GNU compiler. For other compilers, write a script
# which converts these arguments into their equivalent for that particular
# compiler.
ifndef CFLAGS
ifdef DEBUG
CFLAGS := -g
else
CFLAGS := -O3
endif
endif
CFLAGS += -Wall -Werror -Wshadow -Wextra -Iinc \
$(CURL_CFLAGS) $(LIBXML2_CFLAGS) \
-DLIBS3_VER_MAJOR=\"$(LIBS3_VER_MAJOR)\" \
-DLIBS3_VER_MINOR=\"$(LIBS3_VER_MINOR)\" \
-DLIBS3_VER=\"$(LIBS3_VER)\" \
-D__STRICT_ANSI__ \
-D_ISOC99_SOURCE \
-D_POSIX_C_SOURCE=200112L
LDFLAGS = $(CURL_LIBS) $(LIBXML2_LIBS) $(OPENSSL_LIBS) -lpthread
STRIP ?= strip
INSTALL := install --strip-program=$(STRIP)
# --------------------------------------------------------------------------
# Default targets are everything
.PHONY: all
all: exported test
# --------------------------------------------------------------------------
# Exported targets are the library and driver program
.PHONY: exported
exported: libs3 s3 headers
exported_static: $(LIBS3_STATIC)
# --------------------------------------------------------------------------
# Install target
.PHONY: install install_static
install_static: exported_static
$(QUIET_ECHO) $(LIBDIR)/libs3.a: Installing static library
$(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/lib/libs3.a \
$(LIBDIR)/libs3.a
$(QUIET_ECHO) $(DESTDIR)/include/libs3.h: Installing header
$(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r inc/libs3.h \
$(DESTDIR)/include/libs3.h
install: exported
$(QUIET_ECHO) $(DESTDIR)/bin/s3: Installing executable
$(VERBOSE_SHOW) $(INSTALL) -Dps -m u+rwx,go+rx $(BUILD)/bin/s3 \
$(DESTDIR)/bin/s3
$(QUIET_ECHO) \
$(LIBDIR)/libs3.so.$(LIBS3_VER): Installing shared library
$(VERBOSE_SHOW) $(INSTALL) -Dps -m u+rw,go+r \
$(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR) \
$(LIBDIR)/libs3.so.$(LIBS3_VER)
$(QUIET_ECHO) \
$(LIBDIR)/libs3.so.$(LIBS3_VER_MAJOR): Linking shared library
$(VERBOSE_SHOW) ln -sf libs3.so.$(LIBS3_VER) \
$(LIBDIR)/libs3.so.$(LIBS3_VER_MAJOR)
$(QUIET_ECHO) $(LIBDIR)/libs3.so: Linking shared library
$(VERBOSE_SHOW) ln -sf libs3.so.$(LIBS3_VER_MAJOR) $(LIBDIR)/libs3.so
$(QUIET_ECHO) $(LIBDIR)/libs3.a: Installing static library
$(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/lib/libs3.a \
$(LIBDIR)/libs3.a
$(QUIET_ECHO) $(DESTDIR)/include/libs3.h: Installing header
$(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/include/libs3.h \
$(DESTDIR)/include/libs3.h
# --------------------------------------------------------------------------
# Uninstall target
.PHONY: uninstall
uninstall:
$(QUIET_ECHO) Installed files: Uninstalling
$(VERBOSE_SHOW) \
rm -f $(DESTDIR)/bin/s3 \
$(DESTDIR)/include/libs3.h \
$(DESTDIR)/lib/libs3.a \
$(DESTDIR)/lib/libs3.so \
$(DESTDIR)/lib/libs3.so.$(LIBS3_VER_MAJOR) \
$(DESTDIR)/lib/libs3.so.$(LIBS3_VER)
# --------------------------------------------------------------------------
# Compile target patterns
$(BUILD)/obj/%.o: src/%.c
$(QUIET_ECHO) $@: Compiling object
@ mkdir -p $(dir $(BUILD)/dep/$<)
@ $(CC) $(CFLAGS) -M -MG -MQ $@ -DCOMPILINGDEPENDENCIES \
-o $(BUILD)/dep/$(<:%.c=%.d) -c $<
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(CC) $(CFLAGS) -o $@ -c $<
$(BUILD)/obj/%.do: src/%.c
$(QUIET_ECHO) $@: Compiling dynamic object
$(QUIET_ECHO) cflags:${CFLAGS}
@ mkdir -p $(dir $(BUILD)/dep/$<)
@ $(CC) $(CFLAGS) -M -MG -MQ $@ -DCOMPILINGDEPENDENCIES \
-o $(BUILD)/dep/$(<:%.c=%.dd) -c $<
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(CC) $(CFLAGS) -fpic -fPIC -o $@ -c $<
# --------------------------------------------------------------------------
# libs3 library targets
LIBS3_SHARED = $(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR)
LIBS3_STATIC = $(BUILD)/lib/libs3.a
.PHONY: libs3
libs3: $(LIBS3_SHARED) $(LIBS3_STATIC)
LIBS3_SOURCES := bucket.c bucket_metadata.c error_parser.c general.c \
object.c request.c request_context.c \
response_headers_handler.c service_access_logging.c \
service.c simplexml.c util.c multipart.c
$(LIBS3_SHARED): $(LIBS3_SOURCES:%.c=$(BUILD)/obj/%.do)
$(QUIET_ECHO) $@: Building shared library
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(CC) -shared -Wl,-soname,libs3.so.$(LIBS3_VER_MAJOR) \
-o $@ $^ $(LDFLAGS)
$(LIBS3_STATIC): $(LIBS3_SOURCES:%.c=$(BUILD)/obj/%.o)
$(QUIET_ECHO) $@: Building static library
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(AR) cr $@ $^
# --------------------------------------------------------------------------
# Driver program targets
.PHONY: s3
s3: $(BUILD)/bin/s3
$(BUILD)/bin/s3: $(BUILD)/obj/s3.o $(LIBS3_SHARED)
$(QUIET_ECHO) $@: Building executable
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(CC) -o $@ $^ $(LDFLAGS)
# --------------------------------------------------------------------------
# libs3 header targets
.PHONY: headers
headers: $(BUILD)/include/libs3.h
$(BUILD)/include/libs3.h: inc/libs3.h
$(QUIET_ECHO) $@: Linking header
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) ln -sf $(abspath $<) $@
# --------------------------------------------------------------------------
# Test targets
.PHONY: test
test: $(BUILD)/bin/testsimplexml
$(BUILD)/bin/testsimplexml: $(BUILD)/obj/testsimplexml.o $(LIBS3_STATIC)
$(QUIET_ECHO) $@: Building executable
@ mkdir -p $(dir $@)
$(VERBOSE_SHOW) $(CC) -o $@ $^ $(LIBXML2_LIBS)
# --------------------------------------------------------------------------
# Clean target
.PHONY: clean
clean:
$(QUIET_ECHO) $(BUILD): Cleaning
$(VERBOSE_SHOW) rm -rf $(BUILD)
.PHONY: distclean
distclean:
$(QUIET_ECHO) $(BUILD): Cleaning
$(VERBOSE_SHOW) rm -rf $(BUILD)
# --------------------------------------------------------------------------
# Clean dependencies target
.PHONY: cleandeps
cleandeps:
$(QUIET_ECHO) $(BUILD)/dep: Cleaning dependencies
$(VERBOSE_SHOW) rm -rf $(BUILD)/dep
# --------------------------------------------------------------------------
# Dependencies
ALL_SOURCES := $(LIBS3_SOURCES) s3.c testsimplexml.c
$(foreach i, $(ALL_SOURCES), $(eval -include $(BUILD)/dep/src/$(i:%.c=%.d)))
$(foreach i, $(ALL_SOURCES), $(eval -include $(BUILD)/dep/src/$(i:%.c=%.dd)))
# --------------------------------------------------------------------------
# Debian package target
DEBPKG = $(BUILD)/pkg/libs3_$(LIBS3_VER).deb
DEBDEVPKG = $(BUILD)/pkg/libs3-dev_$(LIBS3_VER).deb
.PHONY: deb
deb: $(DEBPKG) $(DEBDEVPKG)
$(DEBPKG): DEBARCH = $(shell dpkg-architecture | grep ^DEB_BUILD_ARCH= | \
cut -d '=' -f 2)
$(DEBPKG): exported $(BUILD)/deb/DEBIAN/control $(BUILD)/deb/DEBIAN/shlibs \
$(BUILD)/deb/DEBIAN/postinst \
$(BUILD)/deb/usr/share/doc/libs3/changelog.gz \
$(BUILD)/deb/usr/share/doc/libs3/changelog.Debian.gz \
$(BUILD)/deb/usr/share/doc/libs3/copyright
DESTDIR=$(BUILD)/deb/usr $(MAKE) install
rm -rf $(BUILD)/deb/usr/include
rm -f $(BUILD)/deb/usr/lib/libs3.a
@mkdir -p $(dir $@)
fakeroot dpkg-deb -b $(BUILD)/deb $@
mv $@ $(BUILD)/pkg/libs3_$(LIBS3_VER)_$(DEBARCH).deb
$(DEBDEVPKG): DEBARCH = $(shell dpkg-architecture | grep ^DEB_BUILD_ARCH= | \
cut -d '=' -f 2)
$(DEBDEVPKG): exported $(BUILD)/deb-dev/DEBIAN/control \
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.gz \
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.Debian.gz \
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/copyright
DESTDIR=$(BUILD)/deb-dev/usr $(MAKE) install
rm -rf $(BUILD)/deb-dev/usr/bin
rm -f $(BUILD)/deb-dev/usr/lib/libs3.so*
@mkdir -p $(dir $@)
fakeroot dpkg-deb -b $(BUILD)/deb-dev $@
mv $@ $(BUILD)/pkg/libs3-dev_$(LIBS3_VER)_$(DEBARCH).deb
$(BUILD)/deb/DEBIAN/control: debian/control
@mkdir -p $(dir $@)
echo -n "Depends: " > $@
dpkg-shlibdeps -Sbuild -O $(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR) | \
cut -d '=' -f 2- >> $@
sed -e 's/LIBS3_VERSION/$(LIBS3_VER)/' \
< $< | sed -e 's/DEBIAN_ARCHITECTURE/$(DEBARCH)/' | \
grep -v ^Source: >> $@
$(BUILD)/deb-dev/DEBIAN/control: debian/control.dev
@mkdir -p $(dir $@)
sed -e 's/LIBS3_VERSION/$(LIBS3_VER)/' \
< $< | sed -e 's/DEBIAN_ARCHITECTURE/$(DEBARCH)/' > $@
$(BUILD)/deb/DEBIAN/shlibs:
echo -n "libs3 $(LIBS3_VER_MAJOR) libs3 " > $@
echo "(>= $(LIBS3_VER))" >> $@
$(BUILD)/deb/DEBIAN/postinst: debian/postinst
@mkdir -p $(dir $@)
cp $< $@
$(BUILD)/deb/usr/share/doc/libs3/copyright: LICENSE
@mkdir -p $(dir $@)
cp $< $@
@echo >> $@
@echo -n "An alternate location for the GNU General Public " >> $@
@echo "License version 3 on Debian" >> $@
@echo "systems is /usr/share/common-licenses/GPL-3." >> $@
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/copyright: LICENSE
@mkdir -p $(dir $@)
cp $< $@
@echo >> $@
@echo -n "An alternate location for the GNU General Public " >> $@
@echo "License version 3 on Debian" >> $@
@echo "systems is /usr/share/common-licenses/GPL-3." >> $@
$(BUILD)/deb/usr/share/doc/libs3/changelog.gz: debian/changelog
@mkdir -p $(dir $@)
gzip --best -c $< > $@
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.gz: debian/changelog
@mkdir -p $(dir $@)
gzip --best -c $< > $@
$(BUILD)/deb/usr/share/doc/libs3/changelog.Debian.gz: debian/changelog.Debian
@mkdir -p $(dir $@)
gzip --best -c $< > $@
$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.Debian.gz: \
debian/changelog.Debian
@mkdir -p $(dir $@)
gzip --best -c $< > $@

View File

@ -0,0 +1,16 @@
# libs3
ExternalProject_Add(libs3
GIT_REPOSITORY https://github.com/bji/libs3
#GIT_TAG v5.0.16
DEPENDS curl2 xml2
SOURCE_DIR "${TD_CONTRIB_DIR}/libs3"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
UPDATE_COMMAND ""
CONFIGURE_COMMAND cp ${TD_SUPPORT_DIR}/libs3.GNUmakefile GNUmakefile && sed -i "s|CFLAGS += -Wall -Werror|CFLAGS += -I'$ENV{HOME}/.cos-local.2/include' -L'$ENV{HOME}/.cos-local.2/lib' |" ./GNUmakefile
BUILD_COMMAND make build/lib/libs3.a
INSTALL_COMMAND make install_static
TEST_COMMAND ""
)

View File

@ -0,0 +1,15 @@
# openssl
ExternalProject_Add(openssl
URL https://www.openssl.org/source/openssl-3.1.3.tar.gz
URL_HASH SHA256=f0316a2ebd89e7f2352976445458689f80302093788c466692fb2a188b2eacf6
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/openssl"
BUILD_IN_SOURCE TRUE
#BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 -static #--no-shared
BUILD_COMMAND make -j
INSTALL_COMMAND make install_sw -j
TEST_COMMAND ""
)

View File

@ -0,0 +1,12 @@
# xml2
ExternalProject_Add(xml2
GIT_REPOSITORY https://github.com/GNOME/libxml2
GIT_TAG v2.11.5
SOURCE_DIR "${TD_CONTRIB_DIR}/xml2"
BINARY_DIR ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -8,6 +8,11 @@ endfunction(cat IN_FILE OUT_FILE)
if(${TD_LINUX})
if(${BUILD_WITH_S3})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/)
else()
set(CONTRIB_TMP_FILE3 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in3")
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
@ -37,6 +42,8 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
endif(${BUILD_WITH_S3})
endif(${TD_LINUX})
set(CONTRIB_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in")
@ -155,6 +162,16 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
# s3
if(${BUILD_WITH_S3})
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_S3)
else()
# cos
if(${BUILD_WITH_COS})
#cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -165,6 +182,8 @@ if(${BUILD_WITH_COS})
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif(${BUILD_WITH_S3})
# lucene
if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -231,7 +250,6 @@ if(${BUILD_TEST})
)
endif(${TD_DARWIN})
endif(${BUILD_TEST})
# cJson
@ -248,6 +266,11 @@ target_include_directories(
)
unset(CMAKE_PROJECT_INCLUDE_BEFORE)
# xml2
if(${BUILD_WITH_S3})
add_subdirectory(xml2 EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_S3})
# lz4
add_subdirectory(lz4/build/cmake EXCLUDE_FROM_ALL)
target_include_directories(
@ -390,6 +413,12 @@ if (${BUILD_WITH_ROCKSDB})
endif()
endif()
if(${BUILD_WITH_S3})
INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.2/include)
MESSAGE("build with s3: ${BUILD_WITH_S3}")
else()
# cos
if(${BUILD_WITH_COS})
if(${TD_LINUX})
@ -397,7 +426,7 @@ if(${BUILD_WITH_COS})
#ADD_DEFINITIONS(-DMINIXML_LIBRARY=${CMAKE_BINARY_DIR}/build/lib/libxml.a)
option(ENABLE_TEST "Enable the tests" OFF)
INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include)
MESSAGE("$ENV{HOME}/.cos-local.1/include")
#MESSAGE("$ENV{HOME}/.cos-local.1/include")
set(CMAKE_BUILD_TYPE Release)
set(ORIG_CMAKE_PROJECT_NAME ${CMAKE_PROJECT_NAME})
@ -415,6 +444,8 @@ if(${BUILD_WITH_COS})
endif(${TD_LINUX})
endif(${BUILD_WITH_COS})
endif(${BUILD_WITH_S3})
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE})

View File

@ -22,6 +22,9 @@ IF (TD_STORAGE)
add_definitions(-DUSE_COS)
ENDIF(${BUILD_WITH_COS})
IF(${BUILD_WITH_S3})
add_definitions(-DUSE_S3)
ENDIF(${BUILD_WITH_S3})
ENDIF(${TD_LINUX})
ENDIF ()

View File

@ -15,12 +15,12 @@
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "defines.h"
#include "os.h"
#include "tconfig.h"
#include "tgrant.h"
#include "tlog.h"
#include "tmisce.h"
#include "defines.h"
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"
@ -222,7 +222,7 @@ float tsFPrecision = 1E-8; // float column precision
double tsDPrecision = 1E-16; // double column precision
uint32_t tsMaxRange = 500; // max quantization intervals
uint32_t tsCurRange = 100; // current quantization intervals
bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom
bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// udf
@ -267,6 +267,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
int8_t tsS3Https = true;
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
int32_t tsS3BlockSize = 4096; // number of tsdb pages
int32_t tsS3BlockCacheSize = 16; // number of blocks
@ -308,6 +311,14 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN);
tstrncpy(tsS3Endpoint, cfgGetItem(pCfg, "s3Endpoint")->str, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, cfgGetItem(pCfg, "s3BucketName")->str, TSDB_FQDN_LEN);
char *proto = strstr(tsS3Endpoint, "https://");
if (!proto) {
tsS3Https = false;
tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN);
} else {
tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN);
}
char *cos = strstr(tsS3Endpoint, "cos.");
if (cos) {
char *appid = strrchr(tsS3BucketName, '-');
@ -319,7 +330,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
}
}
if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) {
#ifdef USE_COS
#if defined(USE_COS) || defined(USE_S3)
tsS3Enabled = true;
#endif
}
@ -1091,7 +1102,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval;
tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor));
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
@ -1680,14 +1690,14 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
}
return;
}
/* cannot alter s3BlockSize
if (strcasecmp(option, "s3BlockSize") == 0) {
int32_t newS3BlockSize = atoi(value);
uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize);
tsS3BlockSize = newS3BlockSize;
return;
}
*/
if (strcasecmp(option, "s3BlockCacheSize") == 0) {
int32_t newS3BlockCacheSize = atoi(value);
uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize);

View File

@ -162,6 +162,36 @@ target_link_libraries(
)
if(${TD_LINUX})
if(${BUILD_WITH_S3})
target_include_directories(
vnode
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
target_link_libraries(
vnode
# s3
PUBLIC ${S3_LIBRARY}
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
PUBLIC xml2
)
add_definitions(-DUSE_S3)
endif(${BUILD_WITH_S3})
if(${BUILD_WITH_COS})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
@ -194,7 +224,6 @@ target_include_directories(
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
if(${BUILD_WITH_COS})
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})

View File

@ -2,13 +2,798 @@
#include "vndCos.h"
extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[];
extern char tsS3AccessKeySecret[];
extern char tsS3BucketName[];
extern char tsS3AppId[];
extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[];
extern char tsS3AccessKeySecret[];
extern char tsS3BucketName[];
extern char tsS3AppId[];
extern char tsS3Hostname[];
extern int8_t tsS3Https;
#if defined(USE_S3)
#include "libs3.h"
static int verifyPeerG = 0;
static const char *awsRegionG = NULL;
static int forceG = 0;
static int showResponsePropertiesG = 0;
static S3Protocol protocolG = S3ProtocolHTTPS;
// static S3Protocol protocolG = S3ProtocolHTTP;
static S3UriStyle uriStyleG = S3UriStylePath;
static int retriesG = 5;
static int timeoutMsG = 0;
static int32_t s3Begin() {
S3Status status;
const char *hostname = tsS3Hostname;
const char *env_hn = getenv("S3_HOSTNAME");
if (env_hn) {
hostname = env_hn;
}
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
vError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
return -1;
}
protocolG = !tsS3Https;
return 0;
}
static void s3End() { S3_deinitialize(); }
int32_t s3Init() { return s3Begin(); }
void s3CleanUp() { s3End(); }
static int should_retry() {
/*
if (retriesG--) {
// Sleep before next retry; start out with a 1 second sleep
static int retrySleepInterval = 1 * SLEEP_UNITS_PER_SECOND;
sleep(retrySleepInterval);
// Next sleep 1 second longer
retrySleepInterval++;
return 1;
}
*/
return 0;
}
static void s3PrintError(const char *func, S3Status status, char error_details[]) {
if (status < S3StatusErrorAccessDenied) {
vError("%s: %s", __func__, S3_get_status_name(status));
} else {
vError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
}
}
typedef struct {
char err_msg[128];
S3Status status;
uint64_t content_length;
char *buf;
} TS3SizeCBD;
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
//(void)callbackData;
TS3SizeCBD *cbd = callbackData;
if (properties->contentLength > 0) {
cbd->content_length = properties->contentLength;
} else {
cbd->content_length = 0;
}
return S3StatusOK;
}
static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) {
TS3SizeCBD *cbd = callbackData;
cbd->status = status;
int len = 0;
const int elen = sizeof(cbd->err_msg);
if (error) {
if (error->message) {
len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message);
}
if (error->resource) {
len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource);
}
if (error->furtherDetails) {
len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails);
}
if (error->extraDetailsCount) {
len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n");
for (int i = 0; i < error->extraDetailsCount; i++) {
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
error->extraDetails[i].value);
}
}
}
}
typedef struct growbuffer {
// The total number of bytes, and the start byte
int size;
// The start byte
int start;
// The blocks
char data[64 * 1024];
struct growbuffer *prev, *next;
} growbuffer;
// returns nonzero on success, zero on out of memory
static int growbuffer_append(growbuffer **gb, const char *data, int dataLen) {
int origDataLen = dataLen;
while (dataLen) {
growbuffer *buf = *gb ? (*gb)->prev : 0;
if (!buf || (buf->size == sizeof(buf->data))) {
buf = (growbuffer *)malloc(sizeof(growbuffer));
if (!buf) {
return 0;
}
buf->size = 0;
buf->start = 0;
if (*gb && (*gb)->prev) {
buf->prev = (*gb)->prev;
buf->next = *gb;
(*gb)->prev->next = buf;
(*gb)->prev = buf;
} else {
buf->prev = buf->next = buf;
*gb = buf;
}
}
int toCopy = (sizeof(buf->data) - buf->size);
if (toCopy > dataLen) {
toCopy = dataLen;
}
memcpy(&(buf->data[buf->size]), data, toCopy);
buf->size += toCopy, data += toCopy, dataLen -= toCopy;
}
return origDataLen;
}
static void growbuffer_read(growbuffer **gb, int amt, int *amtReturn, char *buffer) {
*amtReturn = 0;
growbuffer *buf = *gb;
if (!buf) {
return;
}
*amtReturn = (buf->size > amt) ? amt : buf->size;
memcpy(buffer, &(buf->data[buf->start]), *amtReturn);
buf->start += *amtReturn, buf->size -= *amtReturn;
if (buf->size == 0) {
if (buf->next == buf) {
*gb = 0;
} else {
*gb = buf->next;
buf->prev->next = buf->next;
buf->next->prev = buf->prev;
}
free(buf);
buf = NULL;
}
}
static void growbuffer_destroy(growbuffer *gb) {
growbuffer *start = gb;
while (gb) {
growbuffer *next = gb->next;
free(gb);
gb = (next == start) ? 0 : next;
}
}
typedef struct put_object_callback_data {
char err_msg[128];
S3Status status;
// FILE *infile;
TdFilePtr infileFD;
growbuffer *gb;
uint64_t contentLength, originalContentLength;
uint64_t totalContentLength, totalOriginalContentLength;
int noStatus;
} put_object_callback_data;
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
typedef struct UploadManager {
char err_msg[128];
S3Status status;
// used for initial multipart
char *upload_id;
// used for upload part object
char **etags;
int next_etags_pos;
// used for commit Upload
growbuffer *gb;
int remaining;
} UploadManager;
typedef struct list_parts_callback_data {
char err_msg[128];
S3Status status;
int isTruncated;
char nextPartNumberMarker[24];
char initiatorId[256];
char initiatorDisplayName[256];
char ownerId[256];
char ownerDisplayName[256];
char storageClass[256];
int partsCount;
int handlePartsStart;
int allDetails;
int noPrint;
UploadManager *manager;
} list_parts_callback_data;
typedef struct MultipartPartData {
char err_msg[128];
S3Status status;
put_object_callback_data put_object_data;
int seq;
UploadManager *manager;
} MultipartPartData;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
put_object_callback_data *data = (put_object_callback_data *)callbackData;
if (data->infileFD == 0) {
MultipartPartData *mpd = (MultipartPartData *)callbackData;
data = &mpd->put_object_data;
}
int ret = 0;
if (data->contentLength) {
int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
if (data->gb) {
growbuffer_read(&(data->gb), toRead, &ret, buffer);
} else if (data->infileFD) {
// ret = fread(buffer, 1, toRead, data->infile);
ret = taosReadFile(data->infileFD, buffer, toRead);
}
}
data->contentLength -= ret;
data->totalContentLength -= ret;
/* log too many open files
if (data->contentLength && !data->noStatus) {
vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength);
vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) /
data->totalOriginalContentLength));
}
*/
return ret;
}
S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
UploadManager *manager = (UploadManager *)callbackData;
manager->upload_id = strdup(upload_id);
return S3StatusOK;
}
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
responsePropertiesCallback(properties, callbackData);
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
const char *etag = properties->eTag;
data->manager->etags[seq - 1] = strdup(etag);
data->manager->next_etags_pos = seq;
return S3StatusOK;
}
static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) {
UploadManager *manager = (UploadManager *)callbackData;
int ret = 0;
if (manager->remaining) {
int toRead = ((manager->remaining > bufferSize) ? bufferSize : manager->remaining);
growbuffer_read(&(manager->gb), toRead, &ret, buffer);
}
manager->remaining -= ret;
return ret;
}
/*
static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId,
const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName,
const char *storageClass, int partsCount, int handlePartsStart,
const S3ListPart *parts, void *callbackData) {
list_parts_callback_data *data = (list_parts_callback_data *)callbackData;
data->isTruncated = isTruncated;
data->handlePartsStart = handlePartsStart;
UploadManager *manager = data->manager;
if (nextPartNumberMarker) {
snprintf(data->nextPartNumberMarker, sizeof(data->nextPartNumberMarker), "%s", nextPartNumberMarker);
} else {
data->nextPartNumberMarker[0] = 0;
}
if (initiatorId) {
snprintf(data->initiatorId, sizeof(data->initiatorId), "%s", initiatorId);
} else {
data->initiatorId[0] = 0;
}
if (initiatorDisplayName) {
snprintf(data->initiatorDisplayName, sizeof(data->initiatorDisplayName), "%s", initiatorDisplayName);
} else {
data->initiatorDisplayName[0] = 0;
}
if (ownerId) {
snprintf(data->ownerId, sizeof(data->ownerId), "%s", ownerId);
} else {
data->ownerId[0] = 0;
}
if (ownerDisplayName) {
snprintf(data->ownerDisplayName, sizeof(data->ownerDisplayName), "%s", ownerDisplayName);
} else {
data->ownerDisplayName[0] = 0;
}
if (storageClass) {
snprintf(data->storageClass, sizeof(data->storageClass), "%s", storageClass);
} else {
data->storageClass[0] = 0;
}
if (partsCount && !data->partsCount && !data->noPrint) {
// printListPartsHeader();
}
int i;
for (i = 0; i < partsCount; i++) {
const S3ListPart *part = &(parts[i]);
char timebuf[256];
if (data->noPrint) {
manager->etags[handlePartsStart + i] = strdup(part->eTag);
manager->next_etags_pos++;
manager->remaining = manager->remaining - part->size;
} else {
time_t t = (time_t)part->lastModified;
strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
printf("%-30s", timebuf);
printf("%-15llu", (unsigned long long)part->partNumber);
printf("%-45s", part->eTag);
printf("%-15llu\n", (unsigned long long)part->size);
}
}
data->partsCount += partsCount;
return S3StatusOK;
}
static int try_get_parts_info(const char *bucketName, const char *key, UploadManager *manager) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback};
list_parts_callback_data data;
memset(&data, 0, sizeof(list_parts_callback_data));
data.partsCount = 0;
data.allDetails = 0;
data.manager = manager;
data.noPrint = 1;
do {
data.isTruncated = 0;
do {
S3_list_parts(&bucketContext, key, data.nextPartNumberMarker, manager->upload_id, 0, 0, 0, timeoutMsG,
&listPartsHandler, &data);
} while (S3_status_is_retryable(data.status) && should_retry());
if (data.status != S3StatusOK) {
break;
}
} while (data.isTruncated);
if (data.status == S3StatusOK) {
if (!data.partsCount) {
// printListMultipartHeader(data.allDetails);
}
} else {
s3PrintError(__func__, data.status, data.err_msg);
return -1;
}
return 0;
}
*/
int32_t s3PutObjectFromFile2(const char *file, const char *object) {
int32_t code = 0;
const char *key = object;
// const char *uploadId = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
const char *contentDispositionFilename = 0, *contentEncoding = 0;
int64_t expires = -1;
S3CannedAcl cannedAcl = S3CannedAclPrivate;
int metaPropertiesCount = 0;
S3NameValue metaProperties[S3_MAX_METADATA_COUNT];
char useServerSideEncryption = 0;
int noStatus = 0;
put_object_callback_data data;
// data.infile = 0;
data.infileFD = NULL;
data.gb = 0;
data.noStatus = noStatus;
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
vError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
vError("ERROR: %s Failed to open file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
contentLength;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3PutProperties putProperties = {contentType, md5,
cacheControl, contentDispositionFilename,
contentEncoding, expires,
cannedAcl, metaPropertiesCount,
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
do {
S3_put_object(&bucketContext, key, contentLength, &putProperties, 0, 0, &putObjectHandler, &data);
} while (S3_status_is_retryable(data.status) && should_retry());
if (data.infileFD) {
taosCloseFile(&data.infileFD);
} else if (data.gb) {
growbuffer_destroy(data.gb);
}
if (data.status != S3StatusOK) {
s3PrintError(__func__, data.status, data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data.contentLength) {
vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data.contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
} else {
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager;
manager.upload_id = 0;
manager.gb = 0;
// div round up
int seq;
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8;
int totalSeq = ((contentLength + chunk_size - 1) / chunk_size);
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq);
manager.next_etags_pos = 0;
/*
if (uploadId) {
manager.upload_id = strdup(uploadId);
manager.remaining = contentLength;
if (!try_get_parts_info(tsS3BucketName, key, &manager)) {
fseek(data.infile, -(manager.remaining), 2);
taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END);
contentLength = manager.remaining;
goto upload;
} else {
goto clean;
}
}
*/
do {
S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
upload:
todoContentLength -= chunk_size * manager.next_etags_pos;
for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = data;
}
partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
putProperties.md5 = 0;
do {
S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.status) && should_retry());
if (partData.status != S3StatusOK) {
s3PrintError(__func__, partData.status, partData.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
contentLength -= chunk_size;
todoContentLength -= chunk_size;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++) {
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
clean:
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++) {
taosMemoryFree(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
taosMemoryFree(manager.etags);
}
return code;
}
typedef struct list_bucket_callback_data {
char err_msg[128];
S3Status status;
int isTruncated;
char nextMarker[1024];
int keyCount;
int allDetails;
SArray *objectArray;
} list_bucket_callback_data;
static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
const S3ListBucketContent *contents, int commonPrefixesCount,
const char **commonPrefixes, void *callbackData) {
list_bucket_callback_data *data = (list_bucket_callback_data *)callbackData;
data->isTruncated = isTruncated;
if ((!nextMarker || !nextMarker[0]) && contentsCount) {
nextMarker = contents[contentsCount - 1].key;
}
if (nextMarker) {
snprintf(data->nextMarker, sizeof(data->nextMarker), "%s", nextMarker);
} else {
data->nextMarker[0] = 0;
}
if (contentsCount && !data->keyCount) {
// printListBucketHeader(data->allDetails);
}
int i;
for (i = 0; i < contentsCount; ++i) {
const S3ListBucketContent *content = &(contents[i]);
// printf("%-50s", content->key);
char *object_key = strdup(content->key);
taosArrayPush(data->objectArray, &object_key);
}
data->keyCount += contentsCount;
for (i = 0; i < commonPrefixesCount; i++) {
// printf("\nCommon Prefix: %s\n", commonPrefixes[i]);
}
return S3StatusOK;
}
static void s3FreeObjectKey(void *pItem) {
char *key = *(char **)pItem;
taosMemoryFree(key);
}
void s3DeleteObjectsByPrefix(const char *prefix) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
&listBucketCallback};
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
data.objectArray = taosArrayInit(32, POINTER_BYTES);
if (!data.objectArray) {
vError("%s: %s", __func__, "out of memoty");
return;
}
if (marker) {
snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
} else {
data.nextMarker[0] = 0;
}
data.keyCount = 0;
data.allDetails = allDetails;
do {
data.isTruncated = 0;
do {
S3_list_bucket(&bucketContext, prefix, data.nextMarker, delimiter, maxkeys, 0, timeoutMsG, &listBucketHandler,
&data);
} while (S3_status_is_retryable(data.status) && should_retry());
if (data.status != S3StatusOK) {
break;
}
} while (data.isTruncated && (!maxkeys || (data.keyCount < maxkeys)));
if (data.status == S3StatusOK) {
if (data.keyCount > 0) {
// printListBucketHeader(allDetails);
s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray));
}
} else {
s3PrintError(__func__, data.status, data.err_msg);
}
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
}
void s3DeleteObjects(const char *object_name[], int nobject) {
int status = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
for (int i = 0; i < nobject; ++i) {
TS3SizeCBD cbd = {0};
do {
S3_delete_object(&bucketContext, object_name[i], 0, timeoutMsG, &responseHandler, &cbd);
} while (S3_status_is_retryable(cbd.status) && should_retry());
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
s3PrintError(__func__, cbd.status, cbd.err_msg);
}
}
}
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
TS3SizeCBD *cbd = callbackData;
if (cbd->content_length != bufferSize) {
cbd->status = S3StatusAbortedByCallback;
return S3StatusAbortedByCallback;
}
char *buf = taosMemoryCalloc(1, bufferSize);
if (buf) {
memcpy(buf, buffer, bufferSize);
cbd->buf = buf;
cbd->status = S3StatusOK;
return S3StatusOK;
} else {
cbd->status = S3StatusAbortedByCallback;
return S3StatusAbortedByCallback;
}
}
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) {
int status = 0;
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
&getObjectDataCallback};
TS3SizeCBD cbd = {0};
cbd.content_length = size;
do {
S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
return TAOS_SYSTEM_ERROR(EIO);
}
*ppBlock = cbd.buf;
return 0;
}
long s3Size(const char *object_name) {
long size = 0;
int status = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback};
TS3SizeCBD cbd = {0};
do {
S3_head_object(&bucketContext, object_name, 0, 0, &responseHandler, &cbd);
} while (S3_status_is_retryable(cbd.status) && should_retry());
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
}
size = cbd.content_length;
return size;
}
void s3EvictCache(const char *path, long object_size) {}
#elif defined(USE_COS)
#ifdef USE_COS
#include "cos_api.h"
#include "cos_http_io.h"
#include "cos_log.h"

View File

@ -60,7 +60,7 @@ docker run \
-v /root/.cargo/git:/root/.cargo/git \
-v /root/go/pkg/mod:/root/go/pkg/mod \
-v /root/.cache/go-build:/root/.cache/go-build \
-v /root/.cos-local.1:/root/.cos-local.1 \
-v /root/.cos-local.1:/root/.cos-local.2 \
-v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \
-v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \
-v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \
@ -89,7 +89,7 @@ docker run \
-v /root/.cargo/git:/root/.cargo/git \
-v /root/go/pkg/mod:/root/go/pkg/mod \
-v /root/.cache/go-build:/root/.cache/go-build \
-v /root/.cos-local.1:/root/.cos-local.1 \
-v /root/.cos-local.1:/root/.cos-local.2 \
-v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \
-v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \
-v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \