diff --git a/CMakeLists.txt b/CMakeLists.txt index 5048287cf6..66a6fd328d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,8 +16,6 @@ set(TD_SUPPORT_DIR "${TD_SOURCE_DIR}/cmake") set(TD_CONTRIB_DIR "${TD_SOURCE_DIR}/contrib") - - include(${TD_SUPPORT_DIR}/cmake.platform) include(${TD_SUPPORT_DIR}/cmake.define) include(${TD_SUPPORT_DIR}/cmake.options) @@ -46,4 +44,4 @@ add_subdirectory(examples/c) include(${TD_SUPPORT_DIR}/cmake.install) # docs -add_subdirectory(docs/doxgen) \ No newline at end of file +add_subdirectory(docs/doxgen) diff --git a/cmake/cmake.options b/cmake/cmake.options index 1d4e9ba515..d34c34dd89 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -128,11 +128,43 @@ option( IF(${TD_LINUX}) option( - BUILD_WITH_COS - "If build with cos" + BUILD_S3 + "If build with s3" ON ) +option( + BUILD_WITH_S3 + "If build with s3" + ON +) + +option( + BUILD_WITH_COS + "If build with cos" + OFF +) + +ENDIF () + +IF(${BUILD_S3}) + +IF(${BUILD_WITH_S3}) + +option(BUILD_WITH_COS "If build with cos" OFF) + +ELSE () + +option(BUILD_WITH_COS "If build with cos" ON) + +ENDIF () + +ELSE () + +option(BUILD_WITH_S3 "If build with s3" OFF) + +option(BUILD_WITH_COS "If build with cos" OFF) + ENDIF () option( diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 18fd17f018..9b96ebe7cb 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -93,36 +93,42 @@ ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows") ENDIF() IF ("${CPUTYPE}" STREQUAL "") - IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)") + IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)|(x86_64)|(X86_64)") MESSAGE(STATUS "Current platform is amd64") SET(PLATFORM_ARCH_STR "amd64") + SET(CPUTYPE "x64") SET(TD_INTEL_64 TRUE) ADD_DEFINITIONS("-D_TD_X86_") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)") MESSAGE(STATUS "Current platform is x86") SET(PLATFORM_ARCH_STR "i386") + SET(CPUTYPE "x86") SET(TD_INTEL_32 TRUE) ADD_DEFINITIONS("-D_TD_X86_") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l") MESSAGE(STATUS "Current platform is aarch32") SET(PLATFORM_ARCH_STR "arm") + SET(CPUTYPE "arm32") SET(TD_ARM_32 TRUE) ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_32") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(aarch64)|(arm64)") MESSAGE(STATUS "Current platform is aarch64") SET(PLATFORM_ARCH_STR "arm64") + SET(CPUTYPE "arm64") SET(TD_ARM_64 TRUE) ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_64") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64") MESSAGE(STATUS "The current platform is loongarch64") SET(PLATFORM_ARCH_STR "loongarch64") + SET(CPUTYPE "loongarch64") SET(TD_LOONGARCH_64 TRUE) ADD_DEFINITIONS("-D_TD_LOONGARCH_") ADD_DEFINITIONS("-D_TD_LOONGARCH_64") ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "mips64") SET(PLATFORM_ARCH_STR "mips") + SET(CPUTYPE "mips64") MESSAGE(STATUS "input cpuType: mips64") SET(TD_MIPS_64 TRUE) ADD_DEFINITIONS("-D_TD_MIPS_") diff --git a/cmake/cmake.version b/cmake/cmake.version index fa6ec4df17..b8871dbce3 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -50,7 +50,19 @@ ENDIF () IF (DEFINED VERDATE) SET(TD_VER_DATE ${VERDATE}) ELSE () - STRING(TIMESTAMP TD_VER_DATE "%Y-%m-%d %H:%M:%S") + STRING(COMPARE GREATER_EQUAL "${CMAKE_VERSION}" "3.26" TD_CMAKE_SUPPORT_TZ) + + IF (TD_CMAKE_SUPPORT_TZ) + STRING(TIMESTAMP TD_VER_DATE "%Y-%m-%d %H:%M:%S %z") + ELSE () + IF (TD_WINDOWS) + STRING(TIMESTAMP TD_VER_DATE "%Y-%m-%d %H:%M:%S") + ELSE () + EXECUTE_PROCESS(COMMAND date +"%F %T %z" OUTPUT_VARIABLE TD_VER_DATE) + STRING(REPLACE "\"" "" TD_VER_DATE ${TD_VER_DATE}) + STRING(STRIP ${TD_VER_DATE} TD_VER_DATE) + ENDIF () + ENDIF () ENDIF () IF (DEFINED VERTYPE) @@ -67,9 +79,9 @@ ELSE () ELSEIF (TD_LINUX_32) SET(TD_VER_CPUTYPE "x86") ELSEIF (TD_ARM_32) - SET(TD_VER_CPUTYPE "x86") + SET(TD_VER_CPUTYPE "arm32") ELSEIF (TD_MIPS_32) - SET(TD_VER_CPUTYPE "x86") + SET(TD_VER_CPUTYPE "mips32") ELSE () SET(TD_VER_CPUTYPE "x64") ENDIF () diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in index 1f2291c519..458a518092 100644 --- a/cmake/curl_CMakeLists.txt.in +++ b/cmake/curl_CMakeLists.txt.in @@ -1,18 +1,18 @@ # curl -ExternalProject_Add(curl +ExternalProject_Add(curl2 URL https://curl.se/download/curl-8.2.1.tar.gz URL_HASH MD5=b25588a43556068be05e1624e0e74d41 DOWNLOAD_NO_PROGRESS 1 DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" #GIT_REPOSITORY https://github.com/curl/curl.git #GIT_TAG curl-7_88_1 - SOURCE_DIR "${TD_CONTRIB_DIR}/curl" + SOURCE_DIR "${TD_CONTRIB_DIR}/curl2" + DEPENDS openssl BUILD_IN_SOURCE TRUE BUILD_ALWAYS 1 - #UPDATE_COMMAND "" - CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd - #CONFIGURE_COMMAND ./configure --without-ssl - BUILD_COMMAND make + UPDATE_COMMAND "" + CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 #--enable-debug + BUILD_COMMAND make -j INSTALL_COMMAND make install TEST_COMMAND "" ) diff --git a/cmake/libs3.GNUmakefile b/cmake/libs3.GNUmakefile new file mode 100644 index 0000000000..98f98d8224 --- /dev/null +++ b/cmake/libs3.GNUmakefile @@ -0,0 +1,430 @@ +# GNUmakefile +# +# Copyright 2008 Bryan Ischo +# +# This file is part of libs3. +# +# libs3 is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, version 3 or above of the License. You can also +# redistribute and/or modify it under the terms of the GNU General Public +# License, version 2 or above of the License. +# +# In addition, as a special exception, the copyright holders give +# permission to link the code of this library and its programs with the +# OpenSSL library, and distribute linked combinations including the two. +# +# libs3 is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# version 3 along with libs3, in a file named COPYING. If not, see +# . +# +# You should also have received a copy of the GNU General Public License +# version 2 along with libs3, in a file named COPYING-GPLv2. If not, see +# . + +# I tried to use the autoconf/automake/autolocal/etc (i.e. autohell) tools +# but I just couldn't stomach them. Since this is a Makefile for POSIX +# systems, I will simply do away with autohell completely and use a GNU +# Makefile. GNU make ought to be available pretty much everywhere, so I +# don't see this being a significant issue for portability. + +# All commands assume a GNU compiler. For systems which do not use a GNU +# compiler, write scripts with the same names as these commands, and taking +# the same arguments, and translate the arguments and commands into the +# appropriate non-POSIX ones as needed. libs3 assumes a GNU toolchain as +# the most portable way to build software possible. Non-POSIX, non-GNU +# systems can do the work of supporting this build infrastructure. + + +# -------------------------------------------------------------------------- +# Set libs3 version number, unless it is already set. + +LIBS3_VER_MAJOR ?= 4 +LIBS3_VER_MINOR ?= 1 +LIBS3_VER := $(LIBS3_VER_MAJOR).$(LIBS3_VER_MINOR) + + +# ----------------------------------------------------------------------------- +# Determine verbosity. VERBOSE_SHOW should be prepended to every command which +# should only be displayed if VERBOSE is set. QUIET_ECHO may be used to +# echo text only if VERBOSE is not set. Typically, a VERBOSE_SHOW command will +# be paired with a QUIET_ECHO command, to provide a command which is displayed +# in VERBOSE mode, along with text which is displayed in non-VERBOSE mode to +# describe the command. +# +# No matter what VERBOSE is defined to, it ends up as true if it's defined. +# This will be weird if you defined VERBOSE=false in the environment, and we +# switch it to true here; but the meaning of VERBOSE is, "if it's defined to +# any value, then verbosity is turned on". So don't define VERBOSE if you +# don't want verbosity in the build process. +# ----------------------------------------------------------------------------- + +ifdef VERBOSE + VERBOSE = true + VERBOSE_ECHO = @ echo + VERBOSE_SHOW = + QUIET_ECHO = @ echo > /dev/null +else + VERBOSE = false + VERBOSE_ECHO = @ echo > /dev/null + VERBOSE_SHOW = @ + QUIET_ECHO = @ echo +endif + + +# -------------------------------------------------------------------------- +# BUILD directory +ifndef BUILD + ifdef DEBUG + BUILD := build-debug + else + BUILD := build + endif +endif + + +# -------------------------------------------------------------------------- +# DESTDIR directory +ifndef DESTDIR + DESTDIR := ${HOME}/.cos-local.2 +endif + +# -------------------------------------------------------------------------- +# LIBDIR directory +ifndef LIBDIR + LIBDIR := ${DESTDIR}/lib +endif + +# -------------------------------------------------------------------------- +# Compiler CC handling +ifndef CC + CC := gcc +endif + +# -------------------------------------------------------------------------- +# Acquire configuration information for libraries that libs3 depends upon + +ifndef CURL_LIBS + CURL_LIBS := $(shell curl-config --libs) +endif + +ifndef CURL_CFLAGS + CURL_CFLAGS := $(shell curl-config --cflags) +endif + +ifndef LIBXML2_LIBS + LIBXML2_LIBS := $(shell xml2-config --libs) +endif + +ifndef LIBXML2_CFLAGS + LIBXML2_CFLAGS := $(shell xml2-config --cflags) +endif + +ifndef OPENSSL_LIBS + OPENSSL_LIBS := -lssl -lcrypto +endif + +# -------------------------------------------------------------------------- +# These CFLAGS assume a GNU compiler. For other compilers, write a script +# which converts these arguments into their equivalent for that particular +# compiler. + +ifndef CFLAGS + ifdef DEBUG + CFLAGS := -g + else + CFLAGS := -O3 + endif +endif + +CFLAGS += -Wall -Werror -Wshadow -Wextra -Iinc \ + $(CURL_CFLAGS) $(LIBXML2_CFLAGS) \ + -DLIBS3_VER_MAJOR=\"$(LIBS3_VER_MAJOR)\" \ + -DLIBS3_VER_MINOR=\"$(LIBS3_VER_MINOR)\" \ + -DLIBS3_VER=\"$(LIBS3_VER)\" \ + -D__STRICT_ANSI__ \ + -D_ISOC99_SOURCE \ + -D_POSIX_C_SOURCE=200112L + +LDFLAGS = $(CURL_LIBS) $(LIBXML2_LIBS) $(OPENSSL_LIBS) -lpthread + +STRIP ?= strip +INSTALL := install --strip-program=$(STRIP) + + +# -------------------------------------------------------------------------- +# Default targets are everything + +.PHONY: all +all: exported test + + +# -------------------------------------------------------------------------- +# Exported targets are the library and driver program + +.PHONY: exported +exported: libs3 s3 headers +exported_static: $(LIBS3_STATIC) + +# -------------------------------------------------------------------------- +# Install target + +.PHONY: install install_static +install_static: exported_static + $(QUIET_ECHO) $(LIBDIR)/libs3.a: Installing static library + $(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/lib/libs3.a \ + $(LIBDIR)/libs3.a + $(QUIET_ECHO) $(DESTDIR)/include/libs3.h: Installing header + $(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r inc/libs3.h \ + $(DESTDIR)/include/libs3.h + +install: exported + $(QUIET_ECHO) $(DESTDIR)/bin/s3: Installing executable + $(VERBOSE_SHOW) $(INSTALL) -Dps -m u+rwx,go+rx $(BUILD)/bin/s3 \ + $(DESTDIR)/bin/s3 + $(QUIET_ECHO) \ + $(LIBDIR)/libs3.so.$(LIBS3_VER): Installing shared library + $(VERBOSE_SHOW) $(INSTALL) -Dps -m u+rw,go+r \ + $(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR) \ + $(LIBDIR)/libs3.so.$(LIBS3_VER) + $(QUIET_ECHO) \ + $(LIBDIR)/libs3.so.$(LIBS3_VER_MAJOR): Linking shared library + $(VERBOSE_SHOW) ln -sf libs3.so.$(LIBS3_VER) \ + $(LIBDIR)/libs3.so.$(LIBS3_VER_MAJOR) + $(QUIET_ECHO) $(LIBDIR)/libs3.so: Linking shared library + $(VERBOSE_SHOW) ln -sf libs3.so.$(LIBS3_VER_MAJOR) $(LIBDIR)/libs3.so + $(QUIET_ECHO) $(LIBDIR)/libs3.a: Installing static library + $(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/lib/libs3.a \ + $(LIBDIR)/libs3.a + $(QUIET_ECHO) $(DESTDIR)/include/libs3.h: Installing header + $(VERBOSE_SHOW) $(INSTALL) -Dp -m u+rw,go+r $(BUILD)/include/libs3.h \ + $(DESTDIR)/include/libs3.h + + +# -------------------------------------------------------------------------- +# Uninstall target + +.PHONY: uninstall +uninstall: + $(QUIET_ECHO) Installed files: Uninstalling + $(VERBOSE_SHOW) \ + rm -f $(DESTDIR)/bin/s3 \ + $(DESTDIR)/include/libs3.h \ + $(DESTDIR)/lib/libs3.a \ + $(DESTDIR)/lib/libs3.so \ + $(DESTDIR)/lib/libs3.so.$(LIBS3_VER_MAJOR) \ + $(DESTDIR)/lib/libs3.so.$(LIBS3_VER) + + +# -------------------------------------------------------------------------- +# Compile target patterns + +$(BUILD)/obj/%.o: src/%.c + $(QUIET_ECHO) $@: Compiling object + @ mkdir -p $(dir $(BUILD)/dep/$<) + @ $(CC) $(CFLAGS) -M -MG -MQ $@ -DCOMPILINGDEPENDENCIES \ + -o $(BUILD)/dep/$(<:%.c=%.d) -c $< + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(CC) $(CFLAGS) -o $@ -c $< + +$(BUILD)/obj/%.do: src/%.c + $(QUIET_ECHO) $@: Compiling dynamic object + $(QUIET_ECHO) cflags:${CFLAGS} + @ mkdir -p $(dir $(BUILD)/dep/$<) + @ $(CC) $(CFLAGS) -M -MG -MQ $@ -DCOMPILINGDEPENDENCIES \ + -o $(BUILD)/dep/$(<:%.c=%.dd) -c $< + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(CC) $(CFLAGS) -fpic -fPIC -o $@ -c $< + + +# -------------------------------------------------------------------------- +# libs3 library targets + +LIBS3_SHARED = $(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR) +LIBS3_STATIC = $(BUILD)/lib/libs3.a + +.PHONY: libs3 +libs3: $(LIBS3_SHARED) $(LIBS3_STATIC) + +LIBS3_SOURCES := bucket.c bucket_metadata.c error_parser.c general.c \ + object.c request.c request_context.c \ + response_headers_handler.c service_access_logging.c \ + service.c simplexml.c util.c multipart.c + +$(LIBS3_SHARED): $(LIBS3_SOURCES:%.c=$(BUILD)/obj/%.do) + $(QUIET_ECHO) $@: Building shared library + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(CC) -shared -Wl,-soname,libs3.so.$(LIBS3_VER_MAJOR) \ + -o $@ $^ $(LDFLAGS) + +$(LIBS3_STATIC): $(LIBS3_SOURCES:%.c=$(BUILD)/obj/%.o) + $(QUIET_ECHO) $@: Building static library + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(AR) cr $@ $^ + + +# -------------------------------------------------------------------------- +# Driver program targets + +.PHONY: s3 +s3: $(BUILD)/bin/s3 + +$(BUILD)/bin/s3: $(BUILD)/obj/s3.o $(LIBS3_SHARED) + $(QUIET_ECHO) $@: Building executable + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(CC) -o $@ $^ $(LDFLAGS) + + +# -------------------------------------------------------------------------- +# libs3 header targets + +.PHONY: headers +headers: $(BUILD)/include/libs3.h + +$(BUILD)/include/libs3.h: inc/libs3.h + $(QUIET_ECHO) $@: Linking header + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) ln -sf $(abspath $<) $@ + + +# -------------------------------------------------------------------------- +# Test targets + +.PHONY: test +test: $(BUILD)/bin/testsimplexml + +$(BUILD)/bin/testsimplexml: $(BUILD)/obj/testsimplexml.o $(LIBS3_STATIC) + $(QUIET_ECHO) $@: Building executable + @ mkdir -p $(dir $@) + $(VERBOSE_SHOW) $(CC) -o $@ $^ $(LIBXML2_LIBS) + + +# -------------------------------------------------------------------------- +# Clean target + +.PHONY: clean +clean: + $(QUIET_ECHO) $(BUILD): Cleaning + $(VERBOSE_SHOW) rm -rf $(BUILD) + +.PHONY: distclean +distclean: + $(QUIET_ECHO) $(BUILD): Cleaning + $(VERBOSE_SHOW) rm -rf $(BUILD) + + +# -------------------------------------------------------------------------- +# Clean dependencies target + +.PHONY: cleandeps +cleandeps: + $(QUIET_ECHO) $(BUILD)/dep: Cleaning dependencies + $(VERBOSE_SHOW) rm -rf $(BUILD)/dep + + +# -------------------------------------------------------------------------- +# Dependencies + +ALL_SOURCES := $(LIBS3_SOURCES) s3.c testsimplexml.c + +$(foreach i, $(ALL_SOURCES), $(eval -include $(BUILD)/dep/src/$(i:%.c=%.d))) +$(foreach i, $(ALL_SOURCES), $(eval -include $(BUILD)/dep/src/$(i:%.c=%.dd))) + + +# -------------------------------------------------------------------------- +# Debian package target + +DEBPKG = $(BUILD)/pkg/libs3_$(LIBS3_VER).deb +DEBDEVPKG = $(BUILD)/pkg/libs3-dev_$(LIBS3_VER).deb + +.PHONY: deb +deb: $(DEBPKG) $(DEBDEVPKG) + +$(DEBPKG): DEBARCH = $(shell dpkg-architecture | grep ^DEB_BUILD_ARCH= | \ + cut -d '=' -f 2) +$(DEBPKG): exported $(BUILD)/deb/DEBIAN/control $(BUILD)/deb/DEBIAN/shlibs \ + $(BUILD)/deb/DEBIAN/postinst \ + $(BUILD)/deb/usr/share/doc/libs3/changelog.gz \ + $(BUILD)/deb/usr/share/doc/libs3/changelog.Debian.gz \ + $(BUILD)/deb/usr/share/doc/libs3/copyright + DESTDIR=$(BUILD)/deb/usr $(MAKE) install + rm -rf $(BUILD)/deb/usr/include + rm -f $(BUILD)/deb/usr/lib/libs3.a + @mkdir -p $(dir $@) + fakeroot dpkg-deb -b $(BUILD)/deb $@ + mv $@ $(BUILD)/pkg/libs3_$(LIBS3_VER)_$(DEBARCH).deb + +$(DEBDEVPKG): DEBARCH = $(shell dpkg-architecture | grep ^DEB_BUILD_ARCH= | \ + cut -d '=' -f 2) +$(DEBDEVPKG): exported $(BUILD)/deb-dev/DEBIAN/control \ + $(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.gz \ + $(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.Debian.gz \ + $(BUILD)/deb-dev/usr/share/doc/libs3-dev/copyright + DESTDIR=$(BUILD)/deb-dev/usr $(MAKE) install + rm -rf $(BUILD)/deb-dev/usr/bin + rm -f $(BUILD)/deb-dev/usr/lib/libs3.so* + @mkdir -p $(dir $@) + fakeroot dpkg-deb -b $(BUILD)/deb-dev $@ + mv $@ $(BUILD)/pkg/libs3-dev_$(LIBS3_VER)_$(DEBARCH).deb + +$(BUILD)/deb/DEBIAN/control: debian/control + @mkdir -p $(dir $@) + echo -n "Depends: " > $@ + dpkg-shlibdeps -Sbuild -O $(BUILD)/lib/libs3.so.$(LIBS3_VER_MAJOR) | \ + cut -d '=' -f 2- >> $@ + sed -e 's/LIBS3_VERSION/$(LIBS3_VER)/' \ + < $< | sed -e 's/DEBIAN_ARCHITECTURE/$(DEBARCH)/' | \ + grep -v ^Source: >> $@ + +$(BUILD)/deb-dev/DEBIAN/control: debian/control.dev + @mkdir -p $(dir $@) + sed -e 's/LIBS3_VERSION/$(LIBS3_VER)/' \ + < $< | sed -e 's/DEBIAN_ARCHITECTURE/$(DEBARCH)/' > $@ + +$(BUILD)/deb/DEBIAN/shlibs: + echo -n "libs3 $(LIBS3_VER_MAJOR) libs3 " > $@ + echo "(>= $(LIBS3_VER))" >> $@ + +$(BUILD)/deb/DEBIAN/postinst: debian/postinst + @mkdir -p $(dir $@) + cp $< $@ + +$(BUILD)/deb/usr/share/doc/libs3/copyright: LICENSE + @mkdir -p $(dir $@) + cp $< $@ + @echo >> $@ + @echo -n "An alternate location for the GNU General Public " >> $@ + @echo "License version 3 on Debian" >> $@ + @echo "systems is /usr/share/common-licenses/GPL-3." >> $@ + +$(BUILD)/deb-dev/usr/share/doc/libs3-dev/copyright: LICENSE + @mkdir -p $(dir $@) + cp $< $@ + @echo >> $@ + @echo -n "An alternate location for the GNU General Public " >> $@ + @echo "License version 3 on Debian" >> $@ + @echo "systems is /usr/share/common-licenses/GPL-3." >> $@ + +$(BUILD)/deb/usr/share/doc/libs3/changelog.gz: debian/changelog + @mkdir -p $(dir $@) + gzip --best -c $< > $@ + +$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.gz: debian/changelog + @mkdir -p $(dir $@) + gzip --best -c $< > $@ + +$(BUILD)/deb/usr/share/doc/libs3/changelog.Debian.gz: debian/changelog.Debian + @mkdir -p $(dir $@) + gzip --best -c $< > $@ + +$(BUILD)/deb-dev/usr/share/doc/libs3-dev/changelog.Debian.gz: \ + debian/changelog.Debian + @mkdir -p $(dir $@) + gzip --best -c $< > $@ + + diff --git a/cmake/libs3_CMakeLists.txt.in b/cmake/libs3_CMakeLists.txt.in new file mode 100644 index 0000000000..f2b6cac953 --- /dev/null +++ b/cmake/libs3_CMakeLists.txt.in @@ -0,0 +1,16 @@ +# libs3 + +ExternalProject_Add(libs3 + GIT_REPOSITORY https://github.com/bji/libs3 + #GIT_TAG v5.0.16 + DEPENDS curl2 xml2 + SOURCE_DIR "${TD_CONTRIB_DIR}/libs3" + #BINARY_DIR "" + BUILD_IN_SOURCE TRUE + BUILD_ALWAYS 1 + UPDATE_COMMAND "" + CONFIGURE_COMMAND cp ${TD_SUPPORT_DIR}/libs3.GNUmakefile GNUmakefile && sed -i "s|CFLAGS += -Wall -Werror|CFLAGS += -I'$ENV{HOME}/.cos-local.2/include' -L'$ENV{HOME}/.cos-local.2/lib' |" ./GNUmakefile + BUILD_COMMAND make build/lib/libs3.a + INSTALL_COMMAND make install_static + TEST_COMMAND "" +) diff --git a/cmake/ssl_CMakeLists.txt.in b/cmake/ssl_CMakeLists.txt.in new file mode 100644 index 0000000000..c9f836bade --- /dev/null +++ b/cmake/ssl_CMakeLists.txt.in @@ -0,0 +1,15 @@ +# openssl +ExternalProject_Add(openssl + URL https://www.openssl.org/source/openssl-3.1.3.tar.gz + URL_HASH SHA256=f0316a2ebd89e7f2352976445458689f80302093788c466692fb2a188b2eacf6 + DOWNLOAD_NO_PROGRESS 1 + DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" + SOURCE_DIR "${TD_CONTRIB_DIR}/openssl" + BUILD_IN_SOURCE TRUE + #BUILD_ALWAYS 1 + #UPDATE_COMMAND "" + CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 -static #--no-shared + BUILD_COMMAND make -j + INSTALL_COMMAND make install_sw -j + TEST_COMMAND "" +) diff --git a/cmake/xml2_CMakeLists.txt.in b/cmake/xml2_CMakeLists.txt.in new file mode 100644 index 0000000000..ad0704cdb9 --- /dev/null +++ b/cmake/xml2_CMakeLists.txt.in @@ -0,0 +1,18 @@ + +# xml2 +ExternalProject_Add(xml2 + URL https://download.gnome.org/sources/libxml2/2.11/libxml2-2.11.5.tar.xz + URL_HASH SHA256=3727b078c360ec69fa869de14bd6f75d7ee8d36987b071e6928d4720a28df3a6 + #https://github.com/GNOME/libxml2/archive/refs/tags/v2.11.5.tar.gz + #GIT_REPOSITORY https://github.com/GNOME/libxml2 + #GIT_TAG v2.11.5 + DOWNLOAD_NO_PROGRESS 1 + DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" + SOURCE_DIR "${TD_CONTRIB_DIR}/xml2" + #BINARY_DIR "" + BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --enable-shared=no --enable-static=yes --without-python --without-lzma + BUILD_COMMAND make -j + INSTALL_COMMAND make install && ln -s $ENV{HOME}/.cos-local.2/include/libxml2/libxml $ENV{HOME}/.cos-local.2/include/libxml + TEST_COMMAND "" + ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index a963e4497f..c5715bd53f 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -6,7 +6,10 @@ function(cat IN_FILE OUT_FILE) file(APPEND ${OUT_FILE} "${CONTENTS}") endfunction(cat IN_FILE OUT_FILE) -if(${TD_LINUX}) +if(${BUILD_WITH_S3}) + file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/) + +elseif(${BUILD_WITH_COS}) set(CONTRIB_TMP_FILE3 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in3") configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) @@ -37,7 +40,8 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . execute_process(COMMAND "${CMAKE_COMMAND}" --build . WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") -endif(${TD_LINUX}) +endif() + set(CONTRIB_TMP_FILE "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in") configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -155,15 +159,24 @@ if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_SQLITE}) +# s3 +if(${BUILD_WITH_S3}) + cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + add_definitions(-DUSE_S3) + # cos -if(${BUILD_WITH_COS}) +elseif(${BUILD_WITH_COS}) #cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) #cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) #cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) #cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_COS) -endif(${BUILD_WITH_COS}) + +endif() # lucene if(${BUILD_WITH_LUCENE}) @@ -231,7 +244,6 @@ if(${BUILD_TEST}) ) endif(${TD_DARWIN}) - endif(${BUILD_TEST}) # cJson @@ -248,6 +260,11 @@ target_include_directories( ) unset(CMAKE_PROJECT_INCLUDE_BEFORE) +# xml2 +#if(${BUILD_WITH_S3}) +# add_subdirectory(xml2 EXCLUDE_FROM_ALL) +#endif() + # lz4 add_subdirectory(lz4/build/cmake EXCLUDE_FROM_ALL) target_include_directories( @@ -390,14 +407,18 @@ if (${BUILD_WITH_ROCKSDB}) endif() endif() +if(${BUILD_WITH_S3}) + INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.2/include) + MESSAGE("build with s3: ${BUILD_WITH_S3}") + # cos -if(${BUILD_WITH_COS}) +elseif(${BUILD_WITH_COS}) if(${TD_LINUX}) set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.1) #ADD_DEFINITIONS(-DMINIXML_LIBRARY=${CMAKE_BINARY_DIR}/build/lib/libxml.a) option(ENABLE_TEST "Enable the tests" OFF) INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include) - MESSAGE("$ENV{HOME}/.cos-local.1/include") + #MESSAGE("$ENV{HOME}/.cos-local.1/include") set(CMAKE_BUILD_TYPE Release) set(ORIG_CMAKE_PROJECT_NAME ${CMAKE_PROJECT_NAME}) @@ -413,7 +434,8 @@ if(${BUILD_WITH_COS}) else() endif(${TD_LINUX}) -endif(${BUILD_WITH_COS}) + +endif() # lucene # To support build on ubuntu: sudo apt-get install libboost-all-dev diff --git a/docs/en/14-reference/13-schemaless/13-schemaless.md b/docs/en/14-reference/13-schemaless/13-schemaless.md index 11d3c0d0ab..eb336f4633 100644 --- a/docs/en/14-reference/13-schemaless/13-schemaless.md +++ b/docs/en/14-reference/13-schemaless/13-schemaless.md @@ -94,7 +94,7 @@ The string's MD5 hash value "md5_val" is calculated after the ranking is complet ::: If you do not want to use an automatically generated table name, there are two ways to specify sub table names, the first one has a higher priority. -You can configure smlAutoChildTableNameDelimiter in taos.cfg, for example, `smlAutoChildTableNameDelimiter=tname`. You can insert `st,t0=cpul,t1=4 c1=3 1626006833639000000` and the table name will be cpu1-4. +You can configure smlAutoChildTableNameDelimiter in taos.cfg(except for `@ # space \r \t \n`), for example, `smlAutoChildTableNameDelimiter=tname`. You can insert `st,t0=cpul,t1=4 c1=3 1626006833639000000` and the table name will be cpu1-4. You can configure smlChildTableName in taos.cfg to specify table names, for example, `smlChildTableName=tname`. You can insert `st,tname=cpul,t1=4 c1=3 1626006833639000000` and the cpu1 table will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored. 2. If the super table obtained by parsing the line protocol does not exist, this super table is created. diff --git a/docs/zh/14-reference/13-schemaless/13-schemaless.md b/docs/zh/14-reference/13-schemaless/13-schemaless.md index 084e31d810..723531676c 100644 --- a/docs/zh/14-reference/13-schemaless/13-schemaless.md +++ b/docs/zh/14-reference/13-schemaless/13-schemaless.md @@ -96,7 +96,7 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000 排列完成以后计算该字符串的 MD5 散列值 "md5_val"。然后将计算的结果与字符串组合生成表名:“t_md5_val”。其中的 “t_” 是固定的前缀,每个通过该映射关系自动生成的表都具有该前缀。 :::tip 如果不想用自动生成的表名,有两种指定子表名的方式,第一种优先级更高: -通过在taos.cfg里配置 smlAutoChildTableNameDelimiter 参数来指定。 +通过在taos.cfg里配置 smlAutoChildTableNameDelimiter 参数来指定(`@ # 空格 回车 换行 制表符`除外)。 举例如下:配置 smlAutoChildTableNameDelimiter=- 插入数据为 st,t0=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1-4。 通过在taos.cfg里配置 smlChildTableName 参数来指定。 举例如下:配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1,注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。 diff --git a/include/util/tcache.h b/include/util/tcache.h index d8ab018570..474e5274de 100644 --- a/include/util/tcache.h +++ b/include/util/tcache.h @@ -151,6 +151,8 @@ void *taosCacheIterGetData(const SCacheIter *pIter, size_t *dataLen); void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *keyLen); void taosCacheDestroyIter(SCacheIter *pIter); +void taosCacheTryExtendLifeSpan(SCacheObj *pCacheObj, void **data); + #ifdef __cplusplus } #endif diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index b010467f20..c35845f9df 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -17,12 +17,11 @@ IF (TD_STORAGE) ADD_DEFINITIONS(-D_STORAGE) TARGET_LINK_LIBRARIES(common PRIVATE storage) - IF(${TD_LINUX}) - IF(${BUILD_WITH_COS}) - add_definitions(-DUSE_COS) - ENDIF(${BUILD_WITH_COS}) - - ENDIF(${TD_LINUX}) + IF(${BUILD_WITH_S3}) + add_definitions(-DUSE_S3) + ELSEIF(${BUILD_WITH_COS}) + add_definitions(-DUSE_COS) + ENDIF() ENDIF () diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ed1776dc45..d9b1b2e0ed 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -15,12 +15,12 @@ #define _DEFAULT_SOURCE #include "tglobal.h" +#include "defines.h" #include "os.h" #include "tconfig.h" #include "tgrant.h" #include "tlog.h" #include "tmisce.h" -#include "defines.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -222,7 +222,7 @@ float tsFPrecision = 1E-8; // float column precision double tsDPrecision = 1E-16; // double column precision uint32_t tsMaxRange = 500; // max quantization intervals uint32_t tsCurRange = 100; // current quantization intervals -bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom +bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR // udf @@ -267,6 +267,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int8_t tsS3Https = true; +char tsS3Hostname[TSDB_FQDN_LEN] = ""; + int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockCacheSize = 16; // number of blocks @@ -308,6 +311,14 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN); tstrncpy(tsS3Endpoint, cfgGetItem(pCfg, "s3Endpoint")->str, TSDB_FQDN_LEN); tstrncpy(tsS3BucketName, cfgGetItem(pCfg, "s3BucketName")->str, TSDB_FQDN_LEN); + char *proto = strstr(tsS3Endpoint, "https://"); + if (!proto) { + tsS3Https = false; + tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN); + } else { + tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN); + } + char *cos = strstr(tsS3Endpoint, "cos."); if (cos) { char *appid = strrchr(tsS3BucketName, '-'); @@ -319,7 +330,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { } } if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) { -#ifdef USE_COS +#if defined(USE_COS) || defined(USE_S3) tsS3Enabled = true; #endif } @@ -1091,7 +1102,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval; tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor)); - tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; @@ -1680,14 +1690,14 @@ void taosCfgDynamicOptions(const char *option, const char *value) { } return; } - + /* cannot alter s3BlockSize if (strcasecmp(option, "s3BlockSize") == 0) { int32_t newS3BlockSize = atoi(value); uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize); tsS3BlockSize = newS3BlockSize; return; } - + */ if (strcasecmp(option, "s3BlockCacheSize") == 0) { int32_t newS3BlockCacheSize = atoi(value); uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 1f8c3b161b..4366053237 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -59,7 +59,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId); -static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); +static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq); @@ -79,7 +79,7 @@ int32_t mndInitProfile(SMnode *pMnode) { // in ms int32_t checkTime = tsShellActivityTimer * 2 * 1000; - pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn"); + pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn"); if (pMgmt->connCache == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to alloc profile cache since %s", terrstr()); @@ -185,11 +185,12 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) { return pConn; } -static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { +static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn, bool extendLifespan) { if (pConn == NULL) return; mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn); SProfileMgmt *pMgmt = &pMnode->profileMgmt; + if (extendLifespan) taosCacheTryExtendLifeSpan(pMgmt->connCache, (void **)&pConn); taosCacheRelease(pMgmt->connCache, (void **)&pConn, false); } @@ -323,7 +324,7 @@ _OVER: mndReleaseUser(pMnode, pUser); mndReleaseDb(pMnode, pDb); - mndReleaseConn(pMnode, pConn); + mndReleaseConn(pMnode, pConn, true); return code; } @@ -485,7 +486,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); if (rspBasic == NULL) { - mndReleaseConn(pMnode, pConn); + mndReleaseConn(pMnode, pConn, true); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr()); return -1; @@ -508,7 +509,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1); - mndReleaseConn(pMnode, pConn); + mndReleaseConn(pMnode, pConn, true); hbRsp.query = rspBasic; } else { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1d8dd5e345..29a8ae1f29 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -428,6 +428,8 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), pTrans->startFunc); + taosThreadMutexInit(&pTrans->mutex, NULL); + if (pTrans->startFunc > 0) { TransCbFp fp = mndTransGetCbFp(pTrans->startFunc); if (fp) { @@ -543,10 +545,6 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId); if (pTrans == NULL) { terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; - } else { - #ifdef WINDOWS - taosThreadMutexInit(&pTrans->mutex, NULL); - #endif } return pTrans; } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 84d54f3350..dcc9f9a115 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -161,7 +161,38 @@ target_link_libraries( PUBLIC index ) -if(${TD_LINUX}) +if(${BUILD_S3}) + +if(${BUILD_WITH_S3}) + target_include_directories( + vnode + + PUBLIC "$ENV{HOME}/.cos-local.2/include" + ) + + set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") + set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) + find_library(S3_LIBRARY s3) + find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(XML2_LIBRARY xml2) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) + target_link_libraries( + vnode + + # s3 + PUBLIC ${S3_LIBRARY} + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + PUBLIC ${XML2_LIBRARY} + ) + + add_definitions(-DUSE_S3) +endif() + +if(${BUILD_WITH_COS}) + set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/) find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/) @@ -194,11 +225,10 @@ target_include_directories( PUBLIC "$ENV{HOME}/.cos-local.1/include" ) -if(${BUILD_WITH_COS}) add_definitions(-DUSE_COS) endif(${BUILD_WITH_COS}) -endif(${TD_LINUX}) +endif() IF (TD_GRANT) TARGET_LINK_LIBRARIES(vnode PUBLIC grant) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 5f650be97d..6e36739f5a 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -2,13 +2,798 @@ #include "vndCos.h" -extern char tsS3Endpoint[]; -extern char tsS3AccessKeyId[]; -extern char tsS3AccessKeySecret[]; -extern char tsS3BucketName[]; -extern char tsS3AppId[]; +extern char tsS3Endpoint[]; +extern char tsS3AccessKeyId[]; +extern char tsS3AccessKeySecret[]; +extern char tsS3BucketName[]; +extern char tsS3AppId[]; +extern char tsS3Hostname[]; +extern int8_t tsS3Https; + +#if defined(USE_S3) + +#include "libs3.h" + +static int verifyPeerG = 0; +static const char *awsRegionG = NULL; +static int forceG = 0; +static int showResponsePropertiesG = 0; +static S3Protocol protocolG = S3ProtocolHTTPS; +// static S3Protocol protocolG = S3ProtocolHTTP; +static S3UriStyle uriStyleG = S3UriStylePath; +static int retriesG = 5; +static int timeoutMsG = 0; + +static int32_t s3Begin() { + S3Status status; + const char *hostname = tsS3Hostname; + const char *env_hn = getenv("S3_HOSTNAME"); + + if (env_hn) { + hostname = env_hn; + } + + if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) { + vError("Failed to initialize libs3: %s\n", S3_get_status_name(status)); + return -1; + } + + protocolG = !tsS3Https; + + return 0; +} + +static void s3End() { S3_deinitialize(); } +int32_t s3Init() { return s3Begin(); } + +void s3CleanUp() { s3End(); } + +static int should_retry() { + /* + if (retriesG--) { + // Sleep before next retry; start out with a 1 second sleep + static int retrySleepInterval = 1 * SLEEP_UNITS_PER_SECOND; + sleep(retrySleepInterval); + // Next sleep 1 second longer + retrySleepInterval++; + return 1; + } + */ + + return 0; +} + +static void s3PrintError(const char *func, S3Status status, char error_details[]) { + if (status < S3StatusErrorAccessDenied) { + vError("%s: %s", __func__, S3_get_status_name(status)); + } else { + vError("%s: %s, %s", __func__, S3_get_status_name(status), error_details); + } +} + +typedef struct { + char err_msg[128]; + S3Status status; + uint64_t content_length; + char *buf; +} TS3SizeCBD; + +static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { + //(void)callbackData; + TS3SizeCBD *cbd = callbackData; + if (properties->contentLength > 0) { + cbd->content_length = properties->contentLength; + } else { + cbd->content_length = 0; + } + + return S3StatusOK; +} + +static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) { + TS3SizeCBD *cbd = callbackData; + cbd->status = status; + + int len = 0; + const int elen = sizeof(cbd->err_msg); + if (error) { + if (error->message) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message); + } + if (error->resource) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource); + } + if (error->furtherDetails) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails); + } + if (error->extraDetailsCount) { + len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n"); + for (int i = 0; i < error->extraDetailsCount; i++) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, + error->extraDetails[i].value); + } + } + } +} + +typedef struct growbuffer { + // The total number of bytes, and the start byte + int size; + // The start byte + int start; + // The blocks + char data[64 * 1024]; + struct growbuffer *prev, *next; +} growbuffer; + +// returns nonzero on success, zero on out of memory +static int growbuffer_append(growbuffer **gb, const char *data, int dataLen) { + int origDataLen = dataLen; + while (dataLen) { + growbuffer *buf = *gb ? (*gb)->prev : 0; + if (!buf || (buf->size == sizeof(buf->data))) { + buf = (growbuffer *)malloc(sizeof(growbuffer)); + if (!buf) { + return 0; + } + buf->size = 0; + buf->start = 0; + if (*gb && (*gb)->prev) { + buf->prev = (*gb)->prev; + buf->next = *gb; + (*gb)->prev->next = buf; + (*gb)->prev = buf; + } else { + buf->prev = buf->next = buf; + *gb = buf; + } + } + + int toCopy = (sizeof(buf->data) - buf->size); + if (toCopy > dataLen) { + toCopy = dataLen; + } + + memcpy(&(buf->data[buf->size]), data, toCopy); + + buf->size += toCopy, data += toCopy, dataLen -= toCopy; + } + + return origDataLen; +} + +static void growbuffer_read(growbuffer **gb, int amt, int *amtReturn, char *buffer) { + *amtReturn = 0; + + growbuffer *buf = *gb; + + if (!buf) { + return; + } + + *amtReturn = (buf->size > amt) ? amt : buf->size; + + memcpy(buffer, &(buf->data[buf->start]), *amtReturn); + + buf->start += *amtReturn, buf->size -= *amtReturn; + + if (buf->size == 0) { + if (buf->next == buf) { + *gb = 0; + } else { + *gb = buf->next; + buf->prev->next = buf->next; + buf->next->prev = buf->prev; + } + free(buf); + buf = NULL; + } +} + +static void growbuffer_destroy(growbuffer *gb) { + growbuffer *start = gb; + + while (gb) { + growbuffer *next = gb->next; + free(gb); + gb = (next == start) ? 0 : next; + } +} + +typedef struct put_object_callback_data { + char err_msg[128]; + S3Status status; + // FILE *infile; + TdFilePtr infileFD; + growbuffer *gb; + uint64_t contentLength, originalContentLength; + uint64_t totalContentLength, totalOriginalContentLength; + int noStatus; +} put_object_callback_data; + +#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M + +typedef struct UploadManager { + char err_msg[128]; + S3Status status; + // used for initial multipart + char *upload_id; + + // used for upload part object + char **etags; + int next_etags_pos; + + // used for commit Upload + growbuffer *gb; + int remaining; +} UploadManager; + +typedef struct list_parts_callback_data { + char err_msg[128]; + S3Status status; + int isTruncated; + char nextPartNumberMarker[24]; + char initiatorId[256]; + char initiatorDisplayName[256]; + char ownerId[256]; + char ownerDisplayName[256]; + char storageClass[256]; + int partsCount; + int handlePartsStart; + int allDetails; + int noPrint; + UploadManager *manager; +} list_parts_callback_data; + +typedef struct MultipartPartData { + char err_msg[128]; + S3Status status; + put_object_callback_data put_object_data; + int seq; + UploadManager *manager; +} MultipartPartData; + +static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { + put_object_callback_data *data = (put_object_callback_data *)callbackData; + if (data->infileFD == 0) { + MultipartPartData *mpd = (MultipartPartData *)callbackData; + data = &mpd->put_object_data; + } + + int ret = 0; + + if (data->contentLength) { + int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength); + if (data->gb) { + growbuffer_read(&(data->gb), toRead, &ret, buffer); + } else if (data->infileFD) { + // ret = fread(buffer, 1, toRead, data->infile); + ret = taosReadFile(data->infileFD, buffer, toRead); + } + } + + data->contentLength -= ret; + data->totalContentLength -= ret; + /* log too many open files + if (data->contentLength && !data->noStatus) { + vTrace("%llu bytes remaining ", (unsigned long long)data->totalContentLength); + vTrace("(%d%% complete) ...\n", (int)(((data->totalOriginalContentLength - data->totalContentLength) * 100) / + data->totalOriginalContentLength)); + } + */ + return ret; +} + +S3Status initial_multipart_callback(const char *upload_id, void *callbackData) { + UploadManager *manager = (UploadManager *)callbackData; + manager->upload_id = strdup(upload_id); + return S3StatusOK; +} + +S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) { + responsePropertiesCallback(properties, callbackData); + + MultipartPartData *data = (MultipartPartData *)callbackData; + int seq = data->seq; + const char *etag = properties->eTag; + data->manager->etags[seq - 1] = strdup(etag); + data->manager->next_etags_pos = seq; + return S3StatusOK; +} + +static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) { + UploadManager *manager = (UploadManager *)callbackData; + int ret = 0; + + if (manager->remaining) { + int toRead = ((manager->remaining > bufferSize) ? bufferSize : manager->remaining); + growbuffer_read(&(manager->gb), toRead, &ret, buffer); + } + manager->remaining -= ret; + return ret; +} +/* +static S3Status listPartsCallback(int isTruncated, const char *nextPartNumberMarker, const char *initiatorId, + const char *initiatorDisplayName, const char *ownerId, const char *ownerDisplayName, + const char *storageClass, int partsCount, int handlePartsStart, + const S3ListPart *parts, void *callbackData) { + list_parts_callback_data *data = (list_parts_callback_data *)callbackData; + + data->isTruncated = isTruncated; + data->handlePartsStart = handlePartsStart; + UploadManager *manager = data->manager; + + if (nextPartNumberMarker) { + snprintf(data->nextPartNumberMarker, sizeof(data->nextPartNumberMarker), "%s", nextPartNumberMarker); + } else { + data->nextPartNumberMarker[0] = 0; + } + + if (initiatorId) { + snprintf(data->initiatorId, sizeof(data->initiatorId), "%s", initiatorId); + } else { + data->initiatorId[0] = 0; + } + + if (initiatorDisplayName) { + snprintf(data->initiatorDisplayName, sizeof(data->initiatorDisplayName), "%s", initiatorDisplayName); + } else { + data->initiatorDisplayName[0] = 0; + } + + if (ownerId) { + snprintf(data->ownerId, sizeof(data->ownerId), "%s", ownerId); + } else { + data->ownerId[0] = 0; + } + + if (ownerDisplayName) { + snprintf(data->ownerDisplayName, sizeof(data->ownerDisplayName), "%s", ownerDisplayName); + } else { + data->ownerDisplayName[0] = 0; + } + + if (storageClass) { + snprintf(data->storageClass, sizeof(data->storageClass), "%s", storageClass); + } else { + data->storageClass[0] = 0; + } + + if (partsCount && !data->partsCount && !data->noPrint) { + // printListPartsHeader(); + } + + int i; + for (i = 0; i < partsCount; i++) { + const S3ListPart *part = &(parts[i]); + char timebuf[256]; + if (data->noPrint) { + manager->etags[handlePartsStart + i] = strdup(part->eTag); + manager->next_etags_pos++; + manager->remaining = manager->remaining - part->size; + } else { + time_t t = (time_t)part->lastModified; + strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t)); + printf("%-30s", timebuf); + printf("%-15llu", (unsigned long long)part->partNumber); + printf("%-45s", part->eTag); + printf("%-15llu\n", (unsigned long long)part->size); + } + } + + data->partsCount += partsCount; + + return S3StatusOK; +} + +static int try_get_parts_info(const char *bucketName, const char *key, UploadManager *manager) { + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback}; + + list_parts_callback_data data; + + memset(&data, 0, sizeof(list_parts_callback_data)); + + data.partsCount = 0; + data.allDetails = 0; + data.manager = manager; + data.noPrint = 1; + do { + data.isTruncated = 0; + do { + S3_list_parts(&bucketContext, key, data.nextPartNumberMarker, manager->upload_id, 0, 0, 0, timeoutMsG, + &listPartsHandler, &data); + } while (S3_status_is_retryable(data.status) && should_retry()); + if (data.status != S3StatusOK) { + break; + } + } while (data.isTruncated); + + if (data.status == S3StatusOK) { + if (!data.partsCount) { + // printListMultipartHeader(data.allDetails); + } + } else { + s3PrintError(__func__, data.status, data.err_msg); + return -1; + } + + return 0; +} +*/ +int32_t s3PutObjectFromFile2(const char *file, const char *object) { + int32_t code = 0; + const char *key = object; + // const char *uploadId = 0; + const char *filename = 0; + uint64_t contentLength = 0; + const char *cacheControl = 0, *contentType = 0, *md5 = 0; + const char *contentDispositionFilename = 0, *contentEncoding = 0; + int64_t expires = -1; + S3CannedAcl cannedAcl = S3CannedAclPrivate; + int metaPropertiesCount = 0; + S3NameValue metaProperties[S3_MAX_METADATA_COUNT]; + char useServerSideEncryption = 0; + int noStatus = 0; + put_object_callback_data data; + + // data.infile = 0; + data.infileFD = NULL; + data.gb = 0; + data.noStatus = noStatus; + + if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { + vError("ERROR: %s Failed to stat file %s: ", __func__, file); + code = TAOS_SYSTEM_ERROR(errno); + return code; + } + + if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) { + vError("ERROR: %s Failed to open file %s: ", __func__, file); + code = TAOS_SYSTEM_ERROR(errno); + return code; + } + + data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = + contentLength; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3PutProperties putProperties = {contentType, md5, + cacheControl, contentDispositionFilename, + contentEncoding, expires, + cannedAcl, metaPropertiesCount, + metaProperties, useServerSideEncryption}; + + if (contentLength <= MULTIPART_CHUNK_SIZE) { + S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &putObjectDataCallback}; + + do { + S3_put_object(&bucketContext, key, contentLength, &putProperties, 0, 0, &putObjectHandler, &data); + } while (S3_status_is_retryable(data.status) && should_retry()); + + if (data.infileFD) { + taosCloseFile(&data.infileFD); + } else if (data.gb) { + growbuffer_destroy(data.gb); + } + + if (data.status != S3StatusOK) { + s3PrintError(__func__, data.status, data.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + } else if (data.contentLength) { + vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, + (unsigned long long)data.contentLength); + code = TAOS_SYSTEM_ERROR(EIO); + } + } else { + uint64_t totalContentLength = contentLength; + uint64_t todoContentLength = contentLength; + UploadManager manager; + manager.upload_id = 0; + manager.gb = 0; + + // div round up + int seq; + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8; + int totalSeq = ((contentLength + chunk_size - 1) / chunk_size); + + MultipartPartData partData; + memset(&partData, 0, sizeof(MultipartPartData)); + int partContentLength = 0; + + S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &initial_multipart_callback}; + + S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, + &putObjectDataCallback}; + + S3MultipartCommitHandler commit_handler = { + {&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + + manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq); + manager.next_etags_pos = 0; + /* + if (uploadId) { + manager.upload_id = strdup(uploadId); + manager.remaining = contentLength; + if (!try_get_parts_info(tsS3BucketName, key, &manager)) { + fseek(data.infile, -(manager.remaining), 2); + taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END); + contentLength = manager.remaining; + goto upload; + } else { + goto clean; + } + } + */ + do { + S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + + if (manager.upload_id == 0 || manager.status != S3StatusOK) { + s3PrintError(__func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + upload: + todoContentLength -= chunk_size * manager.next_etags_pos; + for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) { + partData.manager = &manager; + partData.seq = seq; + if (partData.put_object_data.gb == NULL) { + partData.put_object_data = data; + } + partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength); + // printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength); + partData.put_object_data.contentLength = partContentLength; + partData.put_object_data.originalContentLength = partContentLength; + partData.put_object_data.totalContentLength = todoContentLength; + partData.put_object_data.totalOriginalContentLength = totalContentLength; + putProperties.md5 = 0; + do { + S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id, + partContentLength, 0, timeoutMsG, &partData); + } while (S3_status_is_retryable(partData.status) && should_retry()); + if (partData.status != S3StatusOK) { + s3PrintError(__func__, partData.status, partData.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + contentLength -= chunk_size; + todoContentLength -= chunk_size; + } + + int i; + int size = 0; + size += growbuffer_append(&(manager.gb), "", strlen("")); + char buf[256]; + int n; + for (i = 0; i < totalSeq; i++) { + n = snprintf(buf, sizeof(buf), + "%d" + "%s", + i + 1, manager.etags[i]); + size += growbuffer_append(&(manager.gb), buf, n); + } + size += growbuffer_append(&(manager.gb), "", strlen("")); + manager.remaining = size; + + do { + S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0, + timeoutMsG, &manager); + } while (S3_status_is_retryable(manager.status) && should_retry()); + if (manager.status != S3StatusOK) { + s3PrintError(__func__, manager.status, manager.err_msg); + code = TAOS_SYSTEM_ERROR(EIO); + goto clean; + } + + clean: + if (manager.upload_id) { + taosMemoryFree(manager.upload_id); + } + for (i = 0; i < manager.next_etags_pos; i++) { + taosMemoryFree(manager.etags[i]); + } + growbuffer_destroy(manager.gb); + taosMemoryFree(manager.etags); + } + + return code; +} + +typedef struct list_bucket_callback_data { + char err_msg[128]; + S3Status status; + int isTruncated; + char nextMarker[1024]; + int keyCount; + int allDetails; + SArray *objectArray; +} list_bucket_callback_data; + +static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount, + const S3ListBucketContent *contents, int commonPrefixesCount, + const char **commonPrefixes, void *callbackData) { + list_bucket_callback_data *data = (list_bucket_callback_data *)callbackData; + + data->isTruncated = isTruncated; + if ((!nextMarker || !nextMarker[0]) && contentsCount) { + nextMarker = contents[contentsCount - 1].key; + } + if (nextMarker) { + snprintf(data->nextMarker, sizeof(data->nextMarker), "%s", nextMarker); + } else { + data->nextMarker[0] = 0; + } + + if (contentsCount && !data->keyCount) { + // printListBucketHeader(data->allDetails); + } + + int i; + for (i = 0; i < contentsCount; ++i) { + const S3ListBucketContent *content = &(contents[i]); + // printf("%-50s", content->key); + char *object_key = strdup(content->key); + taosArrayPush(data->objectArray, &object_key); + } + data->keyCount += contentsCount; + + for (i = 0; i < commonPrefixesCount; i++) { + // printf("\nCommon Prefix: %s\n", commonPrefixes[i]); + } + + return S3StatusOK; +} + +static void s3FreeObjectKey(void *pItem) { + char *key = *(char **)pItem; + taosMemoryFree(key); +} + +void s3DeleteObjectsByPrefix(const char *prefix) { + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &listBucketCallback}; + + const char *marker = 0, *delimiter = 0; + int maxkeys = 0, allDetails = 0; + list_bucket_callback_data data; + data.objectArray = taosArrayInit(32, POINTER_BYTES); + if (!data.objectArray) { + vError("%s: %s", __func__, "out of memoty"); + return; + } + if (marker) { + snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker); + } else { + data.nextMarker[0] = 0; + } + data.keyCount = 0; + data.allDetails = allDetails; + + do { + data.isTruncated = 0; + do { + S3_list_bucket(&bucketContext, prefix, data.nextMarker, delimiter, maxkeys, 0, timeoutMsG, &listBucketHandler, + &data); + } while (S3_status_is_retryable(data.status) && should_retry()); + if (data.status != S3StatusOK) { + break; + } + } while (data.isTruncated && (!maxkeys || (data.keyCount < maxkeys))); + + if (data.status == S3StatusOK) { + if (data.keyCount > 0) { + // printListBucketHeader(allDetails); + s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray)); + } + } else { + s3PrintError(__func__, data.status, data.err_msg); + } + + taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); +} + +void s3DeleteObjects(const char *object_name[], int nobject) { + int status = 0; + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; + + for (int i = 0; i < nobject; ++i) { + TS3SizeCBD cbd = {0}; + do { + S3_delete_object(&bucketContext, object_name[i], 0, timeoutMsG, &responseHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { + s3PrintError(__func__, cbd.status, cbd.err_msg); + } + } +} + +static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { + TS3SizeCBD *cbd = callbackData; + if (cbd->content_length != bufferSize) { + cbd->status = S3StatusAbortedByCallback; + return S3StatusAbortedByCallback; + } + + char *buf = taosMemoryCalloc(1, bufferSize); + if (buf) { + memcpy(buf, buffer, bufferSize); + cbd->buf = buf; + cbd->status = S3StatusOK; + return S3StatusOK; + } else { + cbd->status = S3StatusAbortedByCallback; + return S3StatusAbortedByCallback; + } +} + +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { + int status = 0; + int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; + const char *ifMatch = 0, *ifNotMatch = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; + S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &getObjectDataCallback}; + + TS3SizeCBD cbd = {0}; + cbd.content_length = size; + do { + S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if (cbd.status != S3StatusOK) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + return TAOS_SYSTEM_ERROR(EIO); + } + + *ppBlock = cbd.buf; + + return 0; +} + +long s3Size(const char *object_name) { + long size = 0; + int status = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback}; + + TS3SizeCBD cbd = {0}; + do { + S3_head_object(&bucketContext, object_name, 0, 0, &responseHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + } + + size = cbd.content_length; + + return size; +} + +void s3EvictCache(const char *path, long object_size) {} + +#elif defined(USE_COS) -#ifdef USE_COS #include "cos_api.h" #include "cos_http_io.h" #include "cos_log.h" diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 9966347219..8fd98f64de 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1412,6 +1412,10 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, SToken* pColName, SDataType if (!checkColumnName(pCxt, pColName) || !checkComment(pCxt, pComment, false)) { return NULL; } + if (IS_VAR_DATA_TYPE(dataType.type) && dataType.bytes == 0) { + pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + return NULL; + } SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF); CHECK_OUT_OF_MEM(pCol); COPY_STRING_FORM_ID_TOKEN(pCol->colName, pColName); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4d212a1c3d..72293e2f8c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2185,11 +2185,12 @@ static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType d nodesDestroyNode((SNode*)pFunc); return TSDB_CODE_OUT_OF_MEMORY; } - if (TSDB_CODE_SUCCESS != getFuncInfo(pCxt, pFunc)) { + int32_t code = getFuncInfo(pCxt, pFunc); + if (TSDB_CODE_SUCCESS != code) { nodesClearList(pFunc->pParameterList); pFunc->pParameterList = NULL; nodesDestroyNode((SNode*)pFunc); - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pExpr)->aliasName); + return code; } *pCast = (SNode*)pFunc; return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 1b8bac4cbc..0ab8927bd0 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -126,7 +126,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_INVALID_FIRST_COLUMN: return "First column must be timestamp"; case TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN: - return "Invalid binary/nchar column length"; + return "Invalid column length for var length type"; case TSDB_CODE_PAR_INVALID_TAGS_NUM: return "Invalid number of tag columns"; case TSDB_CODE_PAR_INVALID_INTERNAL_PK: diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 392ac5d8b2..11f8df4c93 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -997,3 +997,15 @@ void taosCacheDestroyIter(SCacheIter *pIter) { taosMemoryFreeClear(pIter->pCurrent); taosMemoryFreeClear(pIter); } + +void taosCacheTryExtendLifeSpan(SCacheObj *pCacheObj, void **data) { + if (!pCacheObj || !(*data)) return; + + SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode)); + if (pNode->signature != pNode) return; + + if (!pNode->inTrashcan) { + atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); + uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime); + } +} diff --git a/source/util/src/version.c.in b/source/util/src/version.c.in index 71998e3321..ec6449a02f 100644 --- a/source/util/src/version.c.in +++ b/source/util/src/version.c.in @@ -2,6 +2,6 @@ char version[64] = "${TD_VER_NUMBER}"; char compatible_version[12] = "${TD_VER_COMPATIBLE}"; char gitinfo[48] = "${TD_VER_GIT}"; char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}"; -char buildinfo[64] = "Built at ${TD_VER_DATE}"; +char buildinfo[64] = "Built ${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} at ${TD_VER_DATE}"; void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; diff --git a/tests/docs-examples-test/test_R.sh b/tests/docs-examples-test/test_R.sh index 707ea02704..d56f8973f7 100755 --- a/tests/docs-examples-test/test_R.sh +++ b/tests/docs-examples-test/test_R.sh @@ -6,7 +6,7 @@ pgrep taosd || taosd >> /dev/null 2>&1 & pgrep taosadapter || taosadapter >> /dev/null 2>&1 & cd ../../docs/examples/R -wget -N https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.4/taos-jdbcdriver-3.2.4-dist.jar +wget -N https://maven.aliyun.com/repository/central/com/taosdata/jdbc/taos-jdbcdriver/3.2.5/taos-jdbcdriver-3.2.5-dist.jar jar_path=`find . -name taos-jdbcdriver-*-dist.jar` echo jar_path=$jar_path diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index fcd39092bd..6915f802c3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -176,8 +176,16 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True -,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1 -,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit.py -N 2 -n 1 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 2 -n 1 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-column.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-db.py -N 3 -n 3 +e ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeReplicate.py -M 3 -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index f5e426057e..94704b1c25 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -60,7 +60,7 @@ docker run \ -v /root/.cargo/git:/root/.cargo/git \ -v /root/go/pkg/mod:/root/go/pkg/mod \ -v /root/.cache/go-build:/root/.cache/go-build \ - -v /root/.cos-local.1:/root/.cos-local.1 \ + -v /root/.cos-local.1:/root/.cos-local.2 \ -v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \ -v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \ -v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \ @@ -89,7 +89,7 @@ docker run \ -v /root/.cargo/git:/root/.cargo/git \ -v /root/go/pkg/mod:/root/go/pkg/mod \ -v /root/.cache/go-build:/root/.cache/go-build \ - -v /root/.cos-local.1:/root/.cos-local.1 \ + -v /root/.cos-local.1:/root/.cos-local.2 \ -v ${REP_REAL_PATH}/enterprise/src/plugins/taosx/target:${REP_DIR}/enterprise/src/plugins/taosx/target \ -v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \ -v ${REP_REAL_PATH}/community/contrib/cJson/:${REP_DIR}/community/contrib/cJson \ diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index c4fc1ce654..d0ed1d874d 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -36,12 +36,12 @@ class TDSimClient: "locale": "en_US.UTF-8", "charset": "UTF-8", "asyncLog": "0", - "rpcDebugFlag": "143", + "rpcDebugFlag": "135", "tmrDebugFlag": "131", - "cDebugFlag": "143", - "uDebugFlag": "143", - "jniDebugFlag": "143", - "qDebugFlag": "143", + "cDebugFlag": "135", + "uDebugFlag": "135", + "jniDebugFlag": "135", + "qDebugFlag": "135", "supportVnodes": "1024", "enableQueryHb": "1", "telemetryReporting": "0", @@ -130,18 +130,18 @@ class TDDnode: "locale": "en_US.UTF-8", "charset": "UTF-8", "asyncLog": "0", - "mDebugFlag": "143", - "dDebugFlag": "143", - "vDebugFlag": "143", - "tqDebugFlag": "143", - "cDebugFlag": "143", - "jniDebugFlag": "143", - "qDebugFlag": "143", - "rpcDebugFlag": "143", + "mDebugFlag": "135", + "dDebugFlag": "135", + "vDebugFlag": "135", + "tqDebugFlag": "135", + "cDebugFlag": "135", + "jniDebugFlag": "135", + "qDebugFlag": "135", + "rpcDebugFlag": "135", "tmrDebugFlag": "131", - "uDebugFlag": "143", - "sDebugFlag": "143", - "wDebugFlag": "143", + "uDebugFlag": "135", + "sDebugFlag": "135", + "wDebugFlag": "135", "numOfLogLines": "100000000", "statusInterval": "1", "enableQueryHb": "1", diff --git a/tests/system-test/2-query/varchar.py b/tests/system-test/2-query/varchar.py index f0849010c6..debcd1f95e 100644 --- a/tests/system-test/2-query/varchar.py +++ b/tests/system-test/2-query/varchar.py @@ -76,6 +76,12 @@ class TDTestCase: for i in range(tdSql.queryRows): tdSql.checkData(i,0, data_ct1_c8[i]) + tdSql.error("create stable empty_col_stable(ts timestamp, c2 varchar(0)) tags(tg1 int)") + tdSql.error("create stable empty_col_stable(ts timestamp, c2 varchar(10)) tags(tg1 varchar(0))") + tdSql.error("create stable empty_col_stable(ts timestamp, c2 varchar(10)) tags(tg1 nchar(0))") + tdSql.error("create stable empty_col_stable(ts timestamp, c2 varchar(10)) tags(tg1 binary(0))") + tdSql.error("create stable empty_col_stable(ts timestamp, c2 varchar(10)) tags(tg1 varbinary(0))") + # tdSql.query("select c8 from ct4") # data_ct4 = [tdSql.getData(i,0) for i in range(tdSql.queryRows)] diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py index 3a972ff4e9..fb62110b14 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py @@ -146,9 +146,9 @@ class TDTestCase: # a11111=paraDict["dbNumbers"] # print(f"==================={dbNameIndex},{a11111}") threads.append(threading.Thread(target=clusterComCreate.createDeltedatabases, args=(newTdSql, dbNameIndex,repeatNumber,paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))) - + newTdSql2=tdCom.newTdSql() redbNameIndex = '%s%d'%(paraDict["dbName"],i+100) - threads.append(threading.Thread(target=clusterComCreate.createDeltedatabases, args=(newTdSql, redbNameIndex,1,paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))) + threads.append(threading.Thread(target=clusterComCreate.createDeltedatabases, args=(newTdSql2, redbNameIndex,1,paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']))) for tr in threads: tr.start() diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py new file mode 100644 index 0000000000..95c35363a8 --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -0,0 +1,213 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s where c2 >= 0 "%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + tdLog.info("create ctb1") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create ctb2") + paraDict2 = paraDict.copy() + paraDict2['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict2['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert ctb1 data") + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + + tdLog.info("insert ctb2 data") + pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict2) + pInsertThread.join() + pInsertThread1.join() + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectrowcnt / 2 >= resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(2) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + self.prepareTestEnv() + self.tmqCase1(False) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db.py b/tests/system-test/7-tmq/tmqVnodeSplit-db.py new file mode 100644 index 0000000000..1c2e867758 --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db.py @@ -0,0 +1,214 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from db") + queryString = "database %s"%(paraDict['dbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + tdLog.info("create ctb1") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create ctb2") + paraDict2 = paraDict.copy() + + paraDict2['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict2['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert ctb1 data") + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + + tdLog.info("insert ctb2 data") + pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict2) + pInsertThread.join() + pInsertThread1.join() + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectrowcnt / 2 >= resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(2) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + self.prepareTestEnv() + self.tmqCase1(False) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py similarity index 94% rename from tests/system-test/7-tmq/tmqVnodeSplit.py rename to tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py index c6cdc2bf83..c76188eee2 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py @@ -58,7 +58,7 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - tdCom.drop_all_db(); + tdCom.drop_all_db() tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) tdLog.info("create stb") @@ -112,13 +112,13 @@ class TDTestCase: 'tagPrefix': 't', 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], - 'ctbPrefix': 'ctb', + 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 60, + 'pollDelay': 120, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -157,7 +157,13 @@ class TDTestCase: tdLog.info("create ctb1") tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") + + tdLog.info("create ctb2") + paraDict['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert ctb1 data") pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) tmqCom.getStartConsumeNotifyFromTmqsim() @@ -169,11 +175,8 @@ class TDTestCase: # split vgroup self.splitVgroups() - tdLog.info("create ctb2") - paraDict['ctbPrefix'] = "ctbn" - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") + + tdLog.info("insert ctb2 data") pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict) pInsertThread.join() pInsertThread1.join() @@ -194,7 +197,6 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): - tdSql.prepare() self.prepareTestEnv() self.tmqCase1(True) self.prepareTestEnv() diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py new file mode 100644 index 0000000000..3214c2f5c4 --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -0,0 +1,213 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 120, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + tdLog.info("create ctb1") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create ctb2") + paraDict2 = paraDict.copy() + paraDict2['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict2['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert ctb1 data") + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + + tdLog.info("insert ctb2 data") + pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict2) + pInsertThread.join() + pInsertThread1.join() + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectrowcnt / 2 >= resultList[0] or expectrowcnt < resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(2) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + self.prepareTestEnv() + self.tmqCase1(False) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py new file mode 100644 index 0000000000..27d296ed0e --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py @@ -0,0 +1,213 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 60, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb ") + queryString = "stable %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + tdLog.info("create ctb1") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create ctb2") + paraDict2 = paraDict.copy() + paraDict2['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict2['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert ctb1 data") + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + + tdLog.info("insert ctb2 data") + pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict2) + pInsertThread.join() + pInsertThread1.join() + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectrowcnt / 2 >= resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(2) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + self.prepareTestEnv() + self.tmqCase1(False) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py new file mode 100644 index 0000000000..005bca70d6 --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -0,0 +1,298 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 30, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + # tdLog.info("create ctb") + # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdLog.info("insert data") + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def restartAndRemoveWal(self): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + os.system('rm -rf ' + dataPath) + tdLog.debug("dataPath:%s"%dataPath) + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def redistributeVgroups(self): + dnodesList = [] + tdSql.query("show dnodes") + for result in tdSql.queryResult: + dnodesList.append(result[0]) + print("dnodeList:",dnodesList) + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodesList.remove(result[0]) + vnodeId = result[1] + print("its all data",dnodesList) + # if self.replicaVar == 1: + # redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) + # else: + redistributeSql = f"redistribute vgroup {vnodeId} " + for vgdnode in dnodesList: + redistributeSql += f"dnode {vgdnode} " + print(redistributeSql) + + tdLog.debug(f"redistributeSql:{redistributeSql}") + tdSql.query(redistributeSql) + tdLog.debug("redistributeSql ok") + + + def tmqCaseStableSelect(self): + tdLog.printNoPrefix("======== test case 3 subscrib column start : ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stbn', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic3'] + tmqCom.initConsumerTable() + + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s where c2 > 0 "%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + time.sleep(1) + #restart dnode & remove wal + # self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups() + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + expectRows = 2 + resultList = tmqCom.selectConsumeResult(expectRows) + + time.sleep(6) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 3 subscrib column end ...... ") + + def tmqCaseDbname(self): + tdLog.printNoPrefix("======== test case 4 subscrib Dbname start: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stbn', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic4'] + tmqCom.initConsumerTable() + + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from database ") + queryString = "database %s "%(paraDict['dbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + time.sleep(1) + #restart dnode & remove wal + # self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups() + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + expectRows = 2 + resultList = tmqCom.selectConsumeResult(expectRows) + + time.sleep(6) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 4 subscrib Dbname end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCaseStableSelect() + self.prepareTestEnv() + self.tmqCaseDbname() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py new file mode 100644 index 0000000000..2dd756b788 --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py @@ -0,0 +1,272 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 30, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + # tdLog.info("create ctb") + # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdLog.info("insert data") + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def restartAndRemoveWal(self): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + os.system('rm -rf ' + dataPath) + tdLog.debug("dataPath:%s"%dataPath) + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def redistributeVgroups(self): + dnodesList = [] + tdSql.query("show dnodes") + for result in tdSql.queryResult: + dnodesList.append(result[0]) + print("dnodeList:",dnodesList) + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodesList.remove(result[0]) + vnodeId = result[1] + print("its all data",dnodesList) + # if self.replicaVar == 1: + # redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) + # else: + redistributeSql = f"redistribute vgroup {vnodeId} " + for vgdnode in dnodesList: + redistributeSql += f"dnode {vgdnode} " + print(redistributeSql) + + tdLog.debug(f"redistributeSql:{redistributeSql}") + tdSql.query(redistributeSql) + tdLog.debug("redistributeSql ok") + + def tmqCaseStable(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 30, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + tdLog.info("create ctb1") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create ctb2") + paraDict2 = paraDict.copy() + paraDict2['ctbPrefix'] = "ctb2" + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict2['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert data") + pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + # self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups() + + + tdLog.info("insert data") + pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict2) + pInsertThread.join() + pInsertThread1.join() + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectrowcnt / 2 >= resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + # tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(5) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCaseNtable(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName':'dbt'} + + ntbName = "ntb" + + topicNameList = ['topic2'] + tmqCom.initConsumerTable() + + sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName) + tdLog.info("create nomal table sql: %s"%sqlString) + tdSql.execute(sqlString) + + tdLog.info("create topics from nomal table") + queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query("flush database %s"%(paraDict['dbName'])) + #restart dnode & remove wal + # self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups() + + sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName) + tdLog.info("alter table sql: %s"%sqlString) + tdSql.error(sqlString) + expectRows = 0 + resultList = tmqCom.selectConsumeResult(expectRows) + time.sleep(5) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + + def run(self): + self.prepareTestEnv() + self.tmqCaseStable() + self.prepareTestEnv() + self.tmqCaseNtable() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index fa50e46853..aab94bc7a2 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -58,7 +58,7 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - tdCom.drop_all_db(); + tdCom.drop_all_db() tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) tdLog.info("create stb") @@ -102,7 +102,7 @@ class TDTestCase: tdSql.query("show dnodes") for result in tdSql.queryResult: dnodesList.append(result[0]) - + print("dnodeList:",dnodesList) tdSql.query("select * from information_schema.ins_vnodes") vnodeId = 0 for result in tdSql.queryResult: @@ -110,9 +110,16 @@ class TDTestCase: tdLog.debug("dnode is %d"%(result[0])) dnodesList.remove(result[0]) vnodeId = result[1] - break - redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) - tdLog.debug("redistributeSql:%s"%(redistributeSql)) + print("its all data",dnodesList) + # if self.replicaVar == 1: + # redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) + # else: + redistributeSql = f"redistribute vgroup {vnodeId} " + for vgdnode in dnodesList: + redistributeSql += f"dnode {vgdnode} " + print(redistributeSql) + + tdLog.debug(f"redistributeSql:{redistributeSql}") tdSql.query(redistributeSql) tdLog.debug("redistributeSql ok") @@ -179,7 +186,7 @@ class TDTestCase: tmqCom.getStartCommitNotifyFromTmqsim() #restart dnode & remove wal - self.restartAndRemoveWal() + # self.restartAndRemoveWal() # redistribute vgroup self.redistributeVgroups(); @@ -228,7 +235,7 @@ class TDTestCase: tdSql.execute(sqlString) tdSql.query("flush database %s"%(paraDict['dbName'])) #restart dnode & remove wal - self.restartAndRemoveWal() + # self.restartAndRemoveWal() # redistribute vgroup self.redistributeVgroups(); @@ -236,7 +243,8 @@ class TDTestCase: sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName) tdLog.info("alter table sql: %s"%sqlString) tdSql.error(sqlString) - + expectRows = 0 + resultList = tmqCom.selectConsumeResult(expectRows) time.sleep(1) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) @@ -284,7 +292,7 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb with filter") - queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + queryString = "select * from %s.%s where c2 > 0 "%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicNameList[0], queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) @@ -305,24 +313,24 @@ class TDTestCase: time.sleep(5) #restart dnode & remove wal - self.restartAndRemoveWal() + # self.restartAndRemoveWal() # redistribute vgroup - self.redistributeVgroups(); + self.redistributeVgroups() tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tdLog.info("wait the consume result") - - time.sleep(10) + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + time.sleep(20) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) tdLog.printNoPrefix("======== test case 3 end ...... ") def run(self): - - tdSql.prepare() self.prepareTestEnv() self.tmqCase1() self.tmqCase2()