From 636c65d62f180b88d568648c3a9d823c22632e3b Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Wed, 11 Sep 2024 14:30:34 +0800 Subject: [PATCH] enh(query)[TS-4661]. Add AVX2 support for decompression and MIN/MAX comparison - Implement AVX2 instructions to decompress float and double data types - Add AVX2-based MIN/MAX comparison for both integer and floating-point data types - Include unit tests to verify functionality and performance improvements --- cmake/cmake.define | 19 +- include/util/tcompression.h | 16 +- source/common/src/tglobal.c | 8 +- .../libs/function/src/detail/tavgfunction.c | 328 ++-------------- source/libs/function/src/detail/tminmax.c | 365 ++++++----------- source/os/src/osEnv.c | 2 +- source/os/src/osMemory.c | 2 +- source/util/src/tcompression.c | 369 ++++++++++-------- source/util/src/tdecompress.c | 155 +++++++- source/util/test/CMakeLists.txt | 14 +- source/util/test/decompressTest.cpp | 285 +++++++++++--- 11 files changed, 757 insertions(+), 806 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index eb78b54cae..fd8f42de2f 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -177,16 +177,17 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma") ENDIF() - IF (COMPILER_SUPPORT_AVX) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") - ENDIF() - IF (COMPILER_SUPPORT_AVX2) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") - ENDIF() - MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") + MESSAGE(STATUS "FMA instructions is ACTIVATED") ENDIF() + IF (COMPILER_SUPPORT_AVX) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") + ENDIF() + IF (COMPILER_SUPPORT_AVX2) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") + ENDIF() + MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED") IF ("${SIMD_AVX512_SUPPORT}" MATCHES "true") IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index fef6c0713c..d32d20b727 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -152,11 +152,15 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int // for internal usage int32_t getWordLength(char type); +#ifdef __AVX2__ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); -void tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); -void tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output); +int32_t tsDecompressDoubleImpAvx2(const char *input, int32_t nelements, char *output); +void tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian); +#endif +#ifdef __AVX512VL__ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian); -void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); +#endif /************************************************************************* * REGULAR COMPRESSION 2 @@ -213,8 +217,8 @@ typedef int32_t (*__data_compress_init)(char *lossyColumns, float fPrecision, do uint32_t intervals, int32_t ifAdtFse, const char *compressor); typedef int32_t (*__data_compress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output, const char type); -typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output, - const char type); +typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, int32_t ninput, const int32_t nelements, + char *const output, const char type); typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output, int32_t outputSize, const char type, int8_t level); @@ -289,4 +293,4 @@ int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, u } #endif -#endif /*_TD_UTIL_COMPRESSION_H_*/ \ No newline at end of file +#endif /*_TD_UTIL_COMPRESSION_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b6fdc2c3c7..3c05294264 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -645,11 +645,6 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableCoreFile", tsEnableCoreFile, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "ssd42", tsSSE42Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx", tsAVXSupported, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx2", tsAVX2Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "fma", tsFMASupported, CFG_SCOPE_BOTH, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx512", tsAVX512Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "AVX512Enable", tsAVX512Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE)); @@ -1397,6 +1392,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable"); tsSIMDEnable = (bool)pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "AVX512Enable"); + tsAVX512Enable = (bool)pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tagFilterCache"); tsTagFilterCache = (bool)pItem->bval; diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index b1bef84511..7313fc82f7 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -92,254 +92,6 @@ out->sum.usum += val; \ } -static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(float); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - const float* p = plist; - - __m256 val; - __m256 sum = _mm256_setzero_ps(); - - for (int32_t i = 0; i < rounds; ++i) { - val = _mm256_loadu_ps(p); - sum = _mm256_add_ps(sum, val); - p += width; - } - - // let sum up the final results - const float* q = (const float*)∑ - pRes->sum.dsum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7]; - - int32_t startIndex = rounds * width; - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.dsum += plist[j + startIndex]; - } -#endif -} - -static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - const double* p = plist; - - __m256d val; - __m256d sum = _mm256_setzero_pd(); - - for (int32_t i = 0; i < rounds; ++i) { - val = _mm256_loadu_pd(p); - sum = _mm256_add_pd(sum, val); - p += width; - } - - // let sum up the final results - const double* q = (const double*)∑ - pRes->sum.dsum += q[0] + q[1] + q[2] + q[3]; - - int32_t startIndex = rounds * width; - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.dsum += plist[j + startIndex]; - } -#endif -} - -static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX2__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - __m256i sum = _mm256_setzero_si256(); - - if (type == TSDB_DATA_TYPE_TINYINT) { - const int8_t* p = plist; - - for (int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepi8_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint8_t* p = (const uint8_t*)plist; - - for(int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepu8_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint8_t)plist[j + rounds * width]; - } - } - -#endif -} - -static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX2__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - __m256i sum = _mm256_setzero_si256(); - - if (type == TSDB_DATA_TYPE_SMALLINT) { - const int16_t* p = plist; - - for (int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepi16_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint16_t* p = (const uint16_t*)plist; - - for(int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepu16_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint16_t)plist[j + rounds * width]; - } - } - -#endif -} - -static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX2__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - __m256i sum = _mm256_setzero_si256(); - - if (type == TSDB_DATA_TYPE_INT) { - const int32_t* p = plist; - - for (int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepi32_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint32_t* p = (const uint32_t*)plist; - - for(int32_t i = 0; i < rounds; ++i) { - __m128i val = _mm_lddqu_si128((__m128i*)p); - __m256i extVal = _mm256_cvtepu32_epi64(val); // only four items will be converted into __m256i - sum = _mm256_add_epi64(sum, extVal); - p += width; - } - - // let sum up the final results - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint32_t)plist[j + rounds * width]; - } - } - -#endif -} - -static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* pRes) { - const int32_t bitWidth = 256; - -#if __AVX2__ - // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth >> 3u) / sizeof(int64_t); - - int32_t remainder = numOfRows % width; - int32_t rounds = numOfRows / width; - - __m256i sum = _mm256_setzero_si256(); - - const int64_t* p = plist; - - for (int32_t i = 0; i < rounds; ++i) { - __m256i val = _mm256_lddqu_si256((__m256i*)p); - sum = _mm256_add_epi64(sum, val); - p += width; - } - - // let sum up the final results - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - -#endif -} - int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); } bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { @@ -561,23 +313,16 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem = pInput->numOfRows; pAvgRes->count += pInput->numOfRows; - bool simdAvailable = tsAVXSupported && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE); - switch(type) { case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: { const int8_t* plist = (const int8_t*) pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { - i8VectorSumAVX2(plist, numOfRows, type, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (type == TSDB_DATA_TYPE_TINYINT) { - CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) - } else { - CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint8_t)plist[i]) - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (type == TSDB_DATA_TYPE_TINYINT) { + CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) + } else { + CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint8_t)plist[i]) } } break; @@ -587,16 +332,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_SMALLINT: { const int16_t* plist = (const int16_t*)pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { - i16VectorSumAVX2(plist, numOfRows, type, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (type == TSDB_DATA_TYPE_SMALLINT) { - CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) - } else { - CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint16_t)plist[i]) - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (type == TSDB_DATA_TYPE_SMALLINT) { + CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) + } else { + CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint16_t)plist[i]) } } break; @@ -606,16 +346,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_INT: { const int32_t* plist = (const int32_t*) pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { - i32VectorSumAVX2(plist, numOfRows, type, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (type == TSDB_DATA_TYPE_INT) { - CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) - } else { - CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint32_t)plist[i]) - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (type == TSDB_DATA_TYPE_INT) { + CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) + } else { + CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint32_t)plist[i]) } } break; @@ -625,16 +360,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_BIGINT: { const int64_t* plist = (const int64_t*) pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) { - i64VectorSumAVX2(plist, numOfRows, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (type == TSDB_DATA_TYPE_BIGINT) { - CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) - } else { - CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint64_t)plist[i]) - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (type == TSDB_DATA_TYPE_BIGINT) { + CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]) + } else { + CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint64_t)plist[i]) } } break; @@ -643,26 +373,16 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_FLOAT: { const float* plist = (const float*) pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { - floatVectorSumAVX(plist, numOfRows, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - pAvgRes->sum.dsum += plist[i]; - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + pAvgRes->sum.dsum += plist[i]; } break; } case TSDB_DATA_TYPE_DOUBLE: { const double* plist = (const double*)pCol->pData; - // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { - doubleVectorSumAVX(plist, numOfRows, pAvgRes); - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - pAvgRes->sum.dsum += plist[i]; - } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + pAvgRes->sum.dsum += plist[i]; } break; } diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index e3c12e9a57..69c1a8a6dd 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -72,6 +72,7 @@ #define GET_INVOKE_INTRINSIC_THRESHOLD(_bits, _bytes) ((_bits) / ((_bytes) << 3u)) +#ifdef __AVX2__ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) { const int32_t bitWidth = 256; @@ -81,224 +82,104 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder } #define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \ - (_v) = TMAX((_first)[0], (_first)[1]); \ - for (int32_t k = 1; k < (_width); ++k) { \ - (_v) = TMAX((_v), (_first)[k]); \ - } \ - \ - for (int32_t j = 0; j < (_remain); ++j) { \ - if ((_v) < (_sec)[j]) { \ - (_v) = (_sec)[j]; \ - } \ - } + __COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \ + __COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec)) #define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \ - (_v) = TMIN((_first)[0], (_first)[1]); \ - for (int32_t k = 1; k < (_width); ++k) { \ - (_v) = TMIN((_v), (_first)[k]); \ - } \ - \ - for (int32_t j = 0; j < (_remain); ++j) { \ - if ((_v) > (_sec)[j]) { \ - (_v) = (_sec)[j]; \ - } \ - } + __COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \ + __COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec)) -static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) { - int8_t v = 0; +#define CMP_TYPE_MIN_MAX(type, cmp) \ + const type* p = pData; \ + __m256i initVal = _mm256_lddqu_si256((__m256i*)p); \ + p += width; \ + for (int32_t i = 1; i < (rounds); ++i) { \ + __m256i next = _mm256_lddqu_si256((__m256i*)p); \ + initVal = CMP_FUNC_##cmp##_##type(initVal, next); \ + p += width; \ + } \ + const type* q = (const type*)&initVal; \ + type* v = (type*)res; \ + EXTRACT_##cmp##_VAL(q, p, width, remain, *v) + +static void i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) { const int8_t* p = pData; int32_t width, remain, rounds; calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width); -#if __AVX2__ - __m256i next; - __m256i initVal = _mm256_lddqu_si256((__m256i*)p); - p += width; +#define CMP_FUNC_MIN_int8_t _mm256_min_epi8 +#define CMP_FUNC_MAX_int8_t _mm256_max_epi8 +#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8 +#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8 if (!isMinFunc) { // max function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epi8(initVal, next); - p += width; - } - - const int8_t* q = (const int8_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) - } else { // unsigned value - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epu8(initVal, next); - p += width; - } - - const uint8_t* q = (const uint8_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int8_t, MAX); + } else { + CMP_TYPE_MIN_MAX(uint8_t, MAX); } - } else { // min function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epi8(initVal, next); - p += width; - } - - // let sum up the final results - const int8_t* q = (const int8_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int8_t, MIN); } else { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epu8(initVal, next); - p += width; - } - - // let sum up the final results - const uint8_t* q = (const uint8_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(uint8_t, MIN); } } -#endif - - return v; } -static int16_t i16VectorCmpAVX2(const int16_t* pData, int32_t numOfRows, bool isMinFunc, bool signVal) { - int16_t v = 0; - const int16_t* p = pData; - +static void i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) { int32_t width, remain, rounds; calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width); -#if __AVX2__ - __m256i next; - __m256i initVal = _mm256_lddqu_si256((__m256i*)p); - p += width; - +#define CMP_FUNC_MIN_int16_t _mm256_min_epi16 +#define CMP_FUNC_MAX_int16_t _mm256_max_epi16 +#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16 +#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16 if (!isMinFunc) { // max function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epi16(initVal, next); - p += width; - } - - // let sum up the final results - const int16_t* q = (const int16_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int16_t, MAX); } else { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epu16(initVal, next); - p += width; - } - - // let sum up the final results - const uint16_t* q = (const uint16_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(uint16_t, MAX); } - } else { // min function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epi16(initVal, next); - p += width; - } - - // let sum up the final results - const int16_t* q = (const int16_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int16_t, MIN); } else { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epi16(initVal, next); - p += width; - } - - // let sum up the final results - const uint16_t* q = (const uint16_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(uint16_t, MIN); } } -#endif - - return v; } -static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool isMinFunc, bool signVal) { - int32_t v = 0; - const int32_t* p = pData; - +static void i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) { int32_t width, remain, rounds; calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width); -#if __AVX2__ - __m256i next; - __m256i initVal = _mm256_lddqu_si256((__m256i*)p); - p += width; - +#define CMP_FUNC_MIN_int32_t _mm256_min_epi32 +#define CMP_FUNC_MAX_int32_t _mm256_max_epi32 +#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32 +#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32 if (!isMinFunc) { // max function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epi32(initVal, next); - p += width; - } - - // let compare the final results - const int32_t* q = (const int32_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) - } else { // unsigned value - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_max_epi32(initVal, next); - p += width; - } - - // let compare the final results - const uint32_t* q = (const uint32_t*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int32_t, MAX); + } else { + CMP_TYPE_MIN_MAX(uint32_t, MAX); } } else { // min function if (signVal) { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epi32(initVal, next); - p += width; - } - - // let sum up the final results - const int32_t* q = (const int32_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(int32_t, MIN); } else { - for (int32_t i = 0; i < rounds; ++i) { - next = _mm256_lddqu_si256((__m256i*)p); - initVal = _mm256_min_epu32(initVal, next); - p += width; - } - - // let sum up the final results - const uint32_t* q = (const uint32_t*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + CMP_TYPE_MIN_MAX(uint32_t, MIN); } } -#endif - - return v; } -static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) { - float v = 0; +static void floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) { const float* p = pData; int32_t width, remain, rounds; calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width); -#if __AVX__ - __m256 next; __m256 initVal = _mm256_loadu_ps(p); p += width; @@ -311,7 +192,7 @@ static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMin } const float* q = (const float*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + EXTRACT_MAX_VAL(q, p, width, remain, *res) } else { // min function for (int32_t i = 1; i < rounds; ++i) { next = _mm256_loadu_ps(p); @@ -320,22 +201,16 @@ static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMin } const float* q = (const float*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + EXTRACT_MIN_VAL(q, p, width, remain, *res) } -#endif - - return v; } -static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool isMinFunc) { - double v = 0; +static void doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) { const double* p = pData; int32_t width, remain, rounds; calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width); -#if __AVX__ - __m256d next; __m256d initVal = _mm256_loadu_pd(p); p += width; @@ -349,7 +224,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is // let sum up the final results const double* q = (const double*)&initVal; - EXTRACT_MAX_VAL(q, p, width, remain, v) + EXTRACT_MAX_VAL(q, p, width, remain, *res) } else { // min function for (int32_t i = 1; i < rounds; ++i) { next = _mm256_loadu_pd(p); @@ -359,12 +234,10 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is // let sum up the final results const double* q = (const double*)&initVal; - EXTRACT_MIN_VAL(q, p, width, remain, v) + EXTRACT_MIN_VAL(q, p, width, remain, *res) } -#endif - - return v; } +#endif static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows, bool isStr) { int32_t i = start; @@ -378,14 +251,17 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { - // AVX2 version to speedup the loop - if (tsAVX2Supported && tsSIMDEnable) { - pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); - } else { - if (!pBuf->assign) { - pBuf->v = ((int8_t*)data)[start]; - } + if (!pBuf->assign) { + pBuf->v = ((const int8_t*)data)[start]; + } +#ifdef __AVX2__ + if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= sizeof(__m256i)) { + i8VectorCmpAVX2(data + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v); + } else { +#else + if (true) { +#endif if (signVal) { const int8_t* p = (const int8_t*)data; int8_t* v = (int8_t*)&pBuf->v; @@ -412,14 +288,17 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { - // AVX2 version to speedup the loop - if (tsAVX2Supported && tsSIMDEnable) { - pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); - } else { - if (!pBuf->assign) { - pBuf->v = ((int16_t*)data)[start]; - } + if (!pBuf->assign) { + pBuf->v = ((const int16_t*)data)[start]; + } +#ifdef __AVX2__ + if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= sizeof(__m256i)) { + i16VectorCmpAVX2(data + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v); + } else { +#else + if (true) { +#endif if (signVal) { const int16_t* p = (const int16_t*)data; int16_t* v = (int16_t*)&pBuf->v; @@ -446,14 +325,17 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { - // AVX2 version to speedup the loop - if (tsAVX2Supported && tsSIMDEnable) { - pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); - } else { - if (!pBuf->assign) { - pBuf->v = ((int32_t*)data)[start]; - } + if (!pBuf->assign) { + pBuf->v = ((const int32_t*)data)[start]; + } +#ifdef __AVX2__ + if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= sizeof(__m256i)) { + i32VectorCmpAVX2(data + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v); + } else { +#else + if (true) { +#endif if (signVal) { const int32_t* p = (const int32_t*)data; int32_t* v = (int32_t*)&pBuf->v; @@ -481,7 +363,7 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { if (!pBuf->assign) { - pBuf->v = ((int64_t*)data)[start]; + pBuf->v = ((const int64_t*)data)[start]; } if (signVal) { @@ -503,33 +385,29 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S __COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p); } } + + pBuf->assign = true; } static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc) { float* pData = (float*)pCol->pData; float* val = (float*)&pBuf->v; + if (!pBuf->assign) { + *val = pData[start]; + } - // AVX version to speedup the loop - if (tsAVXSupported && tsSIMDEnable) { - *val = floatVectorCmpAVX(pData, numOfRows, isMinFunc); +#ifdef __AVX2__ + if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(float) >= sizeof(__m256i)) { + floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val); } else { - if (!pBuf->assign) { - *val = pData[start]; - } - +#else + if (true) { +#endif if (isMinFunc) { // min - for (int32_t i = start; i < start + numOfRows; ++i) { - if (*val > pData[i]) { - *val = pData[i]; - } - } + __COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData); } else { // max - for (int32_t i = start; i < start + numOfRows; ++i) { - if (*val < pData[i]) { - *val = pData[i]; - } - } + __COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData); } } @@ -540,27 +418,21 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR bool isMinFunc) { double* pData = (double*)pCol->pData; double* val = (double*)&pBuf->v; + if (!pBuf->assign) { + *val = pData[start]; + } - // AVX version to speedup the loop - if (tsAVXSupported && tsSIMDEnable) { - *val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc); +#ifdef __AVX2__ + if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(double) >= sizeof(__m256i)) { + doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val); } else { - if (!pBuf->assign) { - *val = pData[start]; - } - +#else + if (true) { +#endif if (isMinFunc) { // min - for (int32_t i = start; i < start + numOfRows; ++i) { - if (*val > pData[i]) { - *val = pData[i]; - } - } + __COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData); } else { // max - for (int32_t i = start; i < start + numOfRows; ++i) { - if (*val < pData[i]) { - *val = pData[i]; - } - } + __COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData); } } @@ -581,7 +453,7 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c } static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunctionCtx* pCtx, SMinmaxResInfo* pBuf, - bool isMinFunc) { + bool isMinFunc) { if (isMinFunc) { switch (pCol->info.type) { case TSDB_DATA_TYPE_BOOL: @@ -652,8 +524,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu if (colDataIsNull_var(pCol, i)) { continue; } - char *pLeft = (char *)colDataGetData(pCol, i); - char *pRight = (char *)pBuf->str; + char* pLeft = (char*)colDataGetData(pCol, i); + char* pRight = (char*)pBuf->str; int32_t ret = compareLenBinaryVal(pLeft, pRight); if (ret < 0) { @@ -674,8 +546,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu if (colDataIsNull_var(pCol, i)) { continue; } - char *pLeft = (char *)colDataGetData(pCol, i); - char *pRight = (char *)pBuf->str; + char* pLeft = (char*)colDataGetData(pCol, i); + char* pRight = (char*)pBuf->str; int32_t ret = compareLenPrefixedWStr(pLeft, pRight); if (ret < 0) { @@ -761,8 +633,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu if (colDataIsNull_var(pCol, i)) { continue; } - char *pLeft = (char *)colDataGetData(pCol, i); - char *pRight = (char *)pBuf->str; + char* pLeft = (char*)colDataGetData(pCol, i); + char* pRight = (char*)pBuf->str; int32_t ret = compareLenBinaryVal(pLeft, pRight); if (ret > 0) { @@ -784,8 +656,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu if (colDataIsNull_var(pCol, i)) { continue; } - char *pLeft = (char *)colDataGetData(pCol, i); - char *pRight = (char *)pBuf->str; + char* pLeft = (char*)colDataGetData(pCol, i); + char* pRight = (char*)pBuf->str; int32_t ret = compareLenPrefixedWStr(pLeft, pRight); if (ret > 0) { @@ -838,7 +710,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) // data in current data block are qualified to the query if (pInput->colDataSMAIsSet && !IS_STR_DATA_TYPE(type)) { - numOfElems = pInput->numOfRows - pAgg->numOfNull; if (numOfElems == 0) { goto _over; diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 14efa1b534..a3791eb026 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,7 +37,7 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDEnable = 0; +char tsSIMDEnable = 1; char tsAVX512Enable = 0; char tsSSE42Supported = 0; char tsAVXSupported = 0; diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 49a5c2a2a2..c084b76485 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -168,7 +168,7 @@ void startTrace() { Dwarf_Ptr errarg = 0; FILE *fp = fopen("/proc/self/maps", "r"); - fscanf(fp, "%lx-", &addr); + ret = fscanf(fp, "%lx-", &addr); fclose(fp); ret = dwarf_init_path("/proc/self/exe", NULL, 0, DW_GROUPNUMBER_ANY, NULL, errarg, &tDbg, NULL); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 288d440d86..9c9ded693e 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -64,29 +64,33 @@ #include "td_sz.h" int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + const char type); // delta int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, +int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, const char type); // simple8b int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + const char type); // bit int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type); -int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type); +int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + char const type); // double specail int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type); -int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type); +int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + char const type); int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output); int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output); int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals, uint32_t intervals, int32_t ifAdtFse, const char *compressor) { @@ -457,8 +461,8 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) { int32_t word_length = getWordLength(type); - if (word_length == -1) { - return word_length; + if (word_length < 0) { + return -1; } // If not compressed. @@ -467,70 +471,106 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha return nelements * word_length; } -#if __AVX2__ - tsDecompressIntImpl_Hw(input, nelements, output, type); - return nelements * word_length; -#else +#ifdef __AVX512F__ + if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) { + tsDecompressIntImpl_Hw(input, nelements, output, type); + return nelements * word_length; + } +#endif + // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; const char *ip = input + 1; + char *op = output; int32_t count = 0; - int32_t _pos = 0; int64_t prev_value = 0; - while (1) { - if (count == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); + while (count < nelements) { + uint64_t w = *(uint64_t *)ip; char selector = (char)(w & INT64MASK(4)); // selector = 4 char bit = bit_per_integer[(int32_t)selector]; // bit = 3 int32_t elems = selector_to_elems[(int32_t)selector]; - for (int32_t i = 0; i < elems; i++) { - uint64_t zigzag_value; - - if (selector == 0 || selector == 1) { - zigzag_value = 0; - } else { - zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit)); + switch (type) { + case TSDB_DATA_TYPE_BIGINT: { + int64_t *out = (int64_t *)op; + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + *out = prev_value; + } + } else { + uint64_t zigzag_value = 0; + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit)); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + *out = prev_value; + } + } + op = (char *)out; + break; } - int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value); - int64_t curr_value = diff + prev_value; - prev_value = curr_value; - - switch (type) { - case TSDB_DATA_TYPE_BIGINT: - *((int64_t *)output + _pos) = (int64_t)curr_value; - _pos++; - break; - case TSDB_DATA_TYPE_INT: - *((int32_t *)output + _pos) = (int32_t)curr_value; - _pos++; - break; - case TSDB_DATA_TYPE_SMALLINT: - *((int16_t *)output + _pos) = (int16_t)curr_value; - _pos++; - break; - case TSDB_DATA_TYPE_TINYINT: - *((int8_t *)output + _pos) = (int8_t)curr_value; - _pos++; - break; - default: - perror("Wrong integer types.\n"); - return -1; + case TSDB_DATA_TYPE_INT: { + int32_t *out = (int32_t *)op; + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + *out = (int32_t)prev_value; + } + } else { + uint64_t zigzag_value = 0; + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit)); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + *out = (int32_t)prev_value; + } + } + op = (char *)out; + break; } - count++; - if (count == nelements) break; + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *out = (int16_t *)op; + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + *out = (int16_t)prev_value; + } + } else { + uint64_t zigzag_value = 0; + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit)); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + *out = (int16_t)prev_value; + } + } + op = (char *)out; + break; + } + case TSDB_DATA_TYPE_TINYINT: { + int8_t *out = (int8_t *)op; + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + *out = (int8_t)prev_value; + } + } else { + uint64_t zigzag_value = 0; + for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) { + zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit)); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + *out = (int8_t)prev_value; + } + } + op = (char *)out; + break; + } + default: + perror("Wrong integer types.\n"); + return -1; } ip += LONG_BYTES; } return nelements * word_length; -#endif } /* ----------------------------------------------Bool Compression ---------------------------------------------- */ @@ -590,7 +630,8 @@ int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, ch int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) { return tsCompressBoolImp(input, nelements, output); } -int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) { +int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + char const type) { return tsDecompressBoolImp(input, nelements, output); } @@ -602,18 +643,20 @@ int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, c } return TSDB_CODE_THIRDPARTY_ERROR; } -int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) { +int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + char const type) { if (type == TSDB_DATA_TYPE_FLOAT) { - return tsDecompressFloatImp(input, nelements, output); + return tsDecompressFloatImp(input, ninput, nelements, output); } else if (type == TSDB_DATA_TYPE_DOUBLE) { - return tsDecompressDoubleImp(input, nelements, output); + return tsDecompressDoubleImp(input, ninput, nelements, output); } return TSDB_CODE_THIRDPARTY_ERROR; } int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) { return tsCompressINTImp(input, nelements, output, type); } -int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) { +int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + const char type) { return tsDecompressINTImp(input, nelements, output, type); } @@ -824,67 +867,68 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement memcpy(output, input + 1, nelements * longBytes); return nelements * longBytes; } else if (input[0] == 1) { // Decompress - if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) { - tsDecompressTimestampAvx512(input, nelements, output, false); - } else if (tsSIMDEnable && tsAVX2Supported) { - tsDecompressTimestampAvx2(input, nelements, output, false); - } else { - int64_t *ostream = (int64_t *)output; +#ifdef __AVX512VL__ + if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) { + tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian); + return nelements * longBytes; + } +#endif - int32_t ipos = 1, opos = 0; - int8_t nbytes = 0; - int64_t prev_value = 0; - int64_t prev_delta = 0; - int64_t delta_of_delta = 0; + int64_t *ostream = (int64_t *)output; - while (1) { - uint8_t flags = input[ipos++]; - // Decode dd1 - uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; + int32_t ipos = 1, opos = 0; + int8_t nbytes = 0; + int64_t prev_value = 0; + int64_t prev_delta = 0; + int64_t delta_of_delta = 0; + + while (1) { + uint8_t flags = input[ipos++]; + // Decode dd1 + uint64_t dd1 = 0; + nbytes = flags & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; + } else { + if (is_bigendian()) { + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); } else { - if (is_bigendian()) { - memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd1, input + ipos, nbytes); - } - delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); + memcpy(&dd1, input + ipos, nbytes); } + delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); + } - ipos += nbytes; - if (opos == 0) { - prev_value = delta_of_delta; - prev_delta = 0; - ostream[opos++] = delta_of_delta; - } else { - prev_delta = delta_of_delta + prev_delta; - prev_value = prev_value + prev_delta; - ostream[opos++] = prev_value; - } - if (opos == nelements) return nelements * longBytes; - - // Decode dd2 - uint64_t dd2 = 0; - nbytes = (flags >> 4) & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; - } else { - if (is_bigendian()) { - memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd2, input + ipos, nbytes); - } - // zigzag_decoding - delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); - } - ipos += nbytes; + ipos += nbytes; + if (opos == 0) { + prev_value = delta_of_delta; + prev_delta = 0; + ostream[opos++] = delta_of_delta; + } else { prev_delta = delta_of_delta + prev_delta; prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; - if (opos == nelements) return nelements * longBytes; } + if (opos == nelements) return nelements * longBytes; + + // Decode dd2 + uint64_t dd2 = 0; + nbytes = (flags >> 4) & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; + } else { + if (is_bigendian()) { + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd2, input + ipos, nbytes); + } + // zigzag_decoding + delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); + } + ipos += nbytes; + prev_delta = delta_of_delta + prev_delta; + prev_value = prev_value + prev_delta; + ostream[opos++] = prev_value; + if (opos == nelements) return nelements * longBytes; } } @@ -897,7 +941,8 @@ int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char memcpy(output + 1, input, bytes); return bytes + 1; } -int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type) { +int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, + const char type) { int32_t bytes = tDataTypes[type].bytes * nelements; memcpy(output, input + 1, bytes); return bytes; @@ -905,7 +950,7 @@ int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, cha int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) { return tsCompressTimestampImp(input, nelements, output); } -int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, +int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output, const char type) { return tsDecompressTimestampImp(input, nelements, output); } @@ -1023,17 +1068,10 @@ FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const return diff; } -int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output) { - // output stream - double *ostream = (double *)output; - - if (input[0] == 1) { - memcpy(output, input + 1, nelements * DOUBLE_BYTES); - return nelements * DOUBLE_BYTES; - } - +static int32_t tsDecompressDoubleImpHelper(const char *input, int32_t nelements, char *output) { + double *ostream = (double *)output; uint8_t flags = 0; - int32_t ipos = 1; + int32_t ipos = 0; int32_t opos = 0; uint64_t diff = 0; union { @@ -1048,7 +1086,7 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, flags = input[ipos++]; } - diff = decodeDoubleValue(input, &ipos, flags & 0x0f); + diff = decodeDoubleValue(input, &ipos, flags & INT8MASK(4)); flags >>= 4; curr.bits ^= diff; @@ -1058,6 +1096,25 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, return nelements * DOUBLE_BYTES; } +int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) { + // return the result directly if there is no compression + if (input[0] == 1) { + memcpy(output, input + 1, nelements * DOUBLE_BYTES); + return nelements * DOUBLE_BYTES; + } + +#ifdef __AVX2__ + // use AVX2 implementation when allowed and the compression ratio is not high + double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput; + if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) { + return tsDecompressDoubleImpAvx2(input + 1, nelements, output); + } +#endif + + // use implementation without SIMD instructions by default + return tsDecompressDoubleImpHelper(input + 1, nelements, output); +} + /* --------------------------------------------Float Compression ---------------------------------------------- */ void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) { uint8_t nbytes = (flag & INT8MASK(3)) + 1; @@ -1166,49 +1223,50 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t return diff; } -static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float *ostream) { +static int32_t tsDecompressFloatImpHelper(const char *input, int32_t nelements, char *output) { + float *ostream = (float *)output; uint8_t flags = 0; - int32_t ipos = 1; + int32_t ipos = 0; int32_t opos = 0; - uint32_t prev_value = 0; + uint32_t diff = 0; + union { + uint32_t bits; + float real; + } curr; + + curr.bits = 0; for (int32_t i = 0; i < nelements; i++) { if (i % 2 == 0) { flags = input[ipos++]; } - uint8_t flag = flags & INT8MASK(4); + diff = decodeFloatValue(input, &ipos, flags & INT8MASK(4)); flags >>= 4; - - uint32_t diff = decodeFloatValue(input, &ipos, flag); - union { - uint32_t bits; - float real; - } curr; - - uint32_t predicted = prev_value; - curr.bits = predicted ^ diff; - prev_value = curr.bits; + curr.bits ^= diff; ostream[opos++] = curr.real; } + + return nelements * FLOAT_BYTES; } -int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) { +int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) { if (input[0] == 1) { memcpy(output, input + 1, nelements * FLOAT_BYTES); return nelements * FLOAT_BYTES; } - if (tsSIMDEnable && tsAVX2Supported) { - tsDecompressFloatImplAvx2(input, nelements, output); - } else if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) { - tsDecompressFloatImplAvx512(input, nelements, output); - } else { // alternative implementation without SIMD instructions. - tsDecompressFloatHelper(input, nelements, (float *)output); +#ifdef __AVX2__ + // use AVX2 implementation when allowed and the compression ratio is not high + double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput; + if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) { + return tsDecompressFloatImpAvx2(input + 1, nelements, output); } +#endif - return nelements * FLOAT_BYTES; + // use implementation without SIMD instructions by default + return tsDecompressFloatImpHelper(input + 1, nelements, output); } // @@ -1336,10 +1394,11 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 } else { // decompress lossless if (cmprAlg == ONE_STAGE_COMP) { - return tsDecompressFloatImp(pIn, nEle, pOut); + return tsDecompressFloatImp(pIn, nIn, nEle, pOut); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - return tsDecompressFloatImp(pBuf, nEle, pOut); + int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf); + if (bufLen < 0) return -1; + return tsDecompressFloatImp(pBuf, bufLen, nEle, pOut); } else { return TSDB_CODE_INVALID_PARA; } @@ -1373,10 +1432,11 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } else { // decompress lossless if (cmprAlg == ONE_STAGE_COMP) { - return tsDecompressDoubleImp(pIn, nEle, pOut); + return tsDecompressDoubleImp(pIn, nIn, nEle, pOut); } else if (cmprAlg == TWO_STAGE_COMP) { - if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; - return tsDecompressDoubleImp(pBuf, nEle, pOut); + int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf); + if (bufLen < 0) return -1; + return tsDecompressDoubleImp(pBuf, bufLen, nEle, pOut); } else { return TSDB_CODE_INVALID_PARA; } @@ -1550,7 +1610,7 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } else { \ uTrace("dencode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled", \ tDataTypes[type].name); \ - return compressL1Dict[l1].decomprFn(pIn, nEle, pOut, type); \ + return compressL1Dict[l1].decomprFn(pIn, nIn, nEle, pOut, type); \ } \ } else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \ if (compress) { \ @@ -1562,8 +1622,9 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } else { \ uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \ tDataTypes[type].name); \ - if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \ - return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \ + int32_t bufLen = compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type); \ + if (bufLen < 0) return -1; \ + return compressL1Dict[l1].decomprFn(pBuf, bufLen, nEle, pOut, type); \ } \ } else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \ if (compress) { \ diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 598aca730f..60a1f1c938 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -40,6 +40,7 @@ int32_t getWordLength(char type) { return wordLength; } +#ifdef __AVX2__ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) { int32_t word_length = getWordLength(type); @@ -52,7 +53,6 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int32_t _pos = 0; int64_t prevValue = 0; -#if __AVX2__ || __AVX512F__ while (_pos < nelements) { uint64_t w = *(uint64_t *)ip; @@ -309,22 +309,145 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, ip += LONG_BYTES; } -#endif return nelements * word_length; } -void tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) { -#if __AVX512F__ - // todo add it -#endif - return; +#define M256_BYTES sizeof(__m256i) + +FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) { + __m256i dataVec = _mm256_load_si256((__m256i *)data); + __m256i flagVec = _mm256_load_si256((__m256i *)flag); + __m256i k7 = _mm256_set1_epi32(7); + __m256i lopart = _mm256_set_epi32(0, -1, 0, -1, 0, -1, 0, -1); + __m256i hipart = _mm256_set_epi32(-1, 0, -1, 0, -1, 0, -1, 0); + __m256i trTail = _mm256_cmpgt_epi32(flagVec, k7); + __m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi32(-1)); + __m256i shiftVec = _mm256_slli_epi32(_mm256_sub_epi32(_mm256_set1_epi32(3), _mm256_and_si256(flagVec, k7)), 3); + __m256i maskVec = hipart; + __m256i diffVec = _mm256_sllv_epi32(dataVec, _mm256_and_si256(shiftVec, maskVec)); + maskVec = _mm256_or_si256(trHead, lopart); + diffVec = _mm256_srlv_epi32(diffVec, _mm256_and_si256(shiftVec, maskVec)); + maskVec = _mm256_and_si256(trTail, lopart); + diffVec = _mm256_sllv_epi32(diffVec, _mm256_and_si256(shiftVec, maskVec)); + return diffVec; } -// todo add later -void tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { -#if __AVX2__ -#endif - return; +int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output) { + // Allocate memory-aligned buffer + char buf[M256_BYTES * 3]; + memset(buf, 0, sizeof(buf)); + char *data = (char *)ALIGN_NUM((uint64_t)buf, M256_BYTES); + char *flag = data + M256_BYTES; + const char *in = input; + char *out = output; + + // Load data into the buffer for batch processing + int32_t batchSize = M256_BYTES / FLOAT_BYTES; + int32_t idx = 0; + uint32_t cur = 0; + for (int32_t i = 0; i < nelements; i += 2) { + if (idx == batchSize) { + // Start processing when the buffer is full + __m256i resVec = decodeFloatAvx2(data, flag); + _mm256_storeu_si256((__m256i *)out, resVec); + uint32_t *p = (uint32_t *)out; + for (int32_t j = 0; j < batchSize; ++j) { + p[j] = cur = (p[j] ^ cur); + } + out += M256_BYTES; + idx = 0; + } + uint8_t flag1 = (*in) & 0xF; + uint8_t flag2 = ((*in) >> 4) & 0xF; + int32_t nbytes1 = (flag1 & 0x7) + 1; + int32_t nbytes2 = (flag2 & 0x7) + 1; + in++; + flag[idx * FLOAT_BYTES] = flag1; + flag[(idx + 1) * FLOAT_BYTES] = flag2; + memcpy(data + (idx + 1) * FLOAT_BYTES - nbytes1, in, nbytes1 + nbytes2); + in += nbytes1 + nbytes2; + idx += 2; + } + if (idx) { + idx -= (nelements & 0x1); + // Process the remaining few bytes + __m256i resVec = decodeFloatAvx2(data, flag); + memcpy(out, &resVec, idx * FLOAT_BYTES); + uint32_t *p = (uint32_t *)out; + for (int32_t j = 0; j < idx; ++j) { + p[j] = cur = (p[j] ^ cur); + } + out += idx * FLOAT_BYTES; + } + return (int32_t)(out - output); +} + +FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) { + __m256i dataVec = _mm256_load_si256((__m256i *)data); + __m256i flagVec = _mm256_load_si256((__m256i *)flag); + __m256i k7 = _mm256_set1_epi64x(7); + __m256i lopart = _mm256_set_epi64x(0, -1, 0, -1); + __m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0); + __m256i trTail = _mm256_cmpgt_epi64(flagVec, k7); + __m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1)); + __m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3); + __m256i maskVec = hipart; + __m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec)); + maskVec = _mm256_or_si256(trHead, lopart); + diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec)); + maskVec = _mm256_and_si256(trTail, lopart); + diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec)); + return diffVec; +} + +int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, char *const output) { + // Allocate memory-aligned buffer + char buf[M256_BYTES * 3]; + memset(buf, 0, sizeof(buf)); + char *data = (char *)ALIGN_NUM((uint64_t)buf, M256_BYTES); + char *flag = data + M256_BYTES; + const char *in = input; + char *out = output; + + // Load data into the buffer for batch processing + int32_t batchSize = M256_BYTES / DOUBLE_BYTES; + int32_t idx = 0; + uint64_t cur = 0; + for (int32_t i = 0; i < nelements; i += 2) { + if (idx == batchSize) { + // Start processing when the buffer is full + __m256i resVec = decodeDoubleAvx2(data, flag); + _mm256_storeu_si256((__m256i *)out, resVec); + uint64_t *p = (uint64_t *)out; + for (int32_t j = 0; j < batchSize; ++j) { + p[j] = cur = (p[j] ^ cur); + } + out += M256_BYTES; + idx = 0; + } + uint8_t flag1 = (*in) & 0xF; + uint8_t flag2 = ((*in) >> 4) & 0xF; + int32_t nbytes1 = (flag1 & 0x7) + 1; + int32_t nbytes2 = (flag2 & 0x7) + 1; + in++; + flag[idx * DOUBLE_BYTES] = flag1; + flag[(idx + 1) * DOUBLE_BYTES] = flag2; + memcpy(data + (idx + 1) * DOUBLE_BYTES - nbytes1, in, nbytes1 + nbytes2); + in += nbytes1 + nbytes2; + idx += 2; + } + if (idx) { + idx -= (nelements & 0x1); + // Process the remaining few bytes + __m256i resVec = decodeDoubleAvx2(data, flag); + memcpy(out, &resVec, idx * DOUBLE_BYTES); + uint64_t *p = (uint64_t *)out; + for (int32_t j = 0; j < idx; ++j) { + p[j] = cur = (p[j] ^ cur); + } + out += idx * DOUBLE_BYTES; + } + return (int32_t)(out - output); } // decode two timestamps in one loop. @@ -332,7 +455,6 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; -#if __AVX2__ __m128i prevVal = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128(); @@ -464,17 +586,16 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, ostream[opos++] = prevVal[1] + prevDeltaX; } } -#endif return; } +#endif +#if __AVX512VL__ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool UNUSED_PARAM(bigEndian)) { int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; -#if __AVX512VL__ - __m128i prevVal = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128(); @@ -579,6 +700,6 @@ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelement } } -#endif return; } +#endif diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 0d8774ba41..4966a629d8 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -126,12 +126,12 @@ add_test( COMMAND regexTest ) -#add_executable(decompressTest "decompressTest.cpp") -#target_link_libraries(decompressTest os util common gtest_main) -#add_test( -# NAME decompressTest -# COMMAND decompressTest -#) +add_executable(decompressTest "decompressTest.cpp") +target_link_libraries(decompressTest os util common gtest_main) +add_test( + NAME decompressTest + COMMAND decompressTest +) if (${TD_LINUX}) # terrorTest @@ -147,4 +147,4 @@ if (${TD_LINUX}) add_custom_command(TARGET terrorTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $ ) -endif () \ No newline at end of file +endif () diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp index dfcd682255..e508c489df 100644 --- a/source/util/test/decompressTest.cpp +++ b/source/util/test/decompressTest.cpp @@ -1,14 +1,12 @@ +#define ALLOW_FORBID_FUNC #include #include #include +#include #include #include "ttypes.h" -namespace { - -} // namespace - -TEST(utilTest, decompress_ts_test) { +TEST(utilTest, DISABLED_decompress_ts_test) { { tsSIMDEnable = 1; tsAVX2Supported = 1; @@ -29,6 +27,7 @@ TEST(utilTest, decompress_ts_test) { std::cout << ((int64_t*)decompOutput)[i] << std::endl; } +#ifdef __AVX512VL__ memset(decompOutput, 0, 10 * 8); tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 10, reinterpret_cast(decompOutput), false); @@ -36,13 +35,16 @@ TEST(utilTest, decompress_ts_test) { for (int32_t i = 0; i < 10; ++i) { std::cout << ((int64_t*)decompOutput)[i] << std::endl; } +#endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - tsList[0] = 1286; tsList[1] = 1124; tsList[2]=2681; tsList[3] = 2823; + tsList[0] = 1286; + tsList[1] = 1124; + tsList[2] = 2681; + tsList[3] = 2823; -// char* pOutput[4 * sizeof(int64_t)] = {0}; - len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4, - ONE_STAGE_COMP, NULL, 0); + len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4, ONE_STAGE_COMP, + NULL, 0); decompOutput[4 * 8] = {0}; tsDecompressTimestamp(pOutput, len, 4, decompOutput, sizeof(int64_t) * 4, ONE_STAGE_COMP, NULL, 0); @@ -56,6 +58,7 @@ TEST(utilTest, decompress_ts_test) { int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7, ONE_STAGE_COMP, NULL, 0); +#ifdef __AVX512VL__ memset(decompOutput, 0, 10 * 8); tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 7, reinterpret_cast(decompOutput), false); @@ -63,12 +66,14 @@ TEST(utilTest, decompress_ts_test) { for (int32_t i = 0; i < 7; ++i) { std::cout << ((int64_t*)decompOutput)[i] << std::endl; } +#endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// int64_t tsList2[1] = {1700000000}; int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1, ONE_STAGE_COMP, NULL, 0); +#ifdef __AVX512VL__ memset(decompOutput, 0, 10 * 8); tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 1, reinterpret_cast(decompOutput), false); @@ -76,52 +81,10 @@ TEST(utilTest, decompress_ts_test) { for (int32_t i = 0; i < 1; ++i) { std::cout << ((int64_t*)decompOutput)[i] << std::endl; } +#endif } -TEST(utilTest, decompress_bigint_avx2_test) { - { - tsSIMDEnable = 1; - tsAVX2Supported = 1; - } - - int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400, - 1700000500, 1700000600, 1700000700, 1700000800, 1700000900}; - - char* pOutput[10 * sizeof(int64_t)] = {0}; - int32_t len = tsCompressBigint(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, - ONE_STAGE_COMP, NULL, 0); - - char* decompOutput[10 * 8] = {0}; - - tsDecompressBigint(pOutput, len, 10, decompOutput, sizeof(int64_t) * 10, ONE_STAGE_COMP, NULL, 0); - - for (int32_t i = 0; i < 10; ++i) { - std::cout << ((int64_t*)decompOutput)[i] << std::endl; - } -} - -TEST(utilTest, decompress_int_avx2_test) { - { - tsSIMDEnable = 1; - tsAVX2Supported = 1; - } - - int32_t tsList[10] = {17000000, 17000001, 17000002, 17000003, 17000004, - 17000005, 17000006, 17000007, 17000008, 17000009}; - - char* pOutput[10 * sizeof(int32_t)] = {0}; - int32_t len = - tsCompressInt(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0); - - char* decompOutput[10 * 8] = {0}; - tsDecompressInt(pOutput, len, 10, decompOutput, sizeof(int32_t) * 10, ONE_STAGE_COMP, NULL, 0); - - for (int32_t i = 0; i < 10; ++i) { - std::cout << ((int32_t*)decompOutput)[i] << std::endl; - } -} - -TEST(utilTest, decompress_perf_test) { +TEST(utilTest, DISABLED_decompress_perf_test) { int32_t num = 10000; int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); @@ -149,9 +112,11 @@ TEST(utilTest, decompress_perf_test) { memset(pOutput, 0, num * sizeof(int64_t)); st = taosGetTimestampUs(); +#ifdef __AVX512VL__ for (int32_t k = 0; k < 10000; ++k) { tsDecompressTimestampAvx512(px, num, pOutput, false); } +#endif int64_t el2 = taosGetTimestampUs() - st; std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl; @@ -303,7 +268,7 @@ void* genCompressData_float(int32_t type, int32_t num, bool order) { } return pBuf; } -TEST(utilTest, compressAlg) { +TEST(utilTest, DISABLED_compressAlg) { int32_t num = 4096; int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); int64_t iniVal = 17000; @@ -479,4 +444,214 @@ TEST(utilTest, compressAlg) { } taosMemoryFree(p); } -} \ No newline at end of file +} + +static uint32_t decompressRandomSeed; + +static void refreshSeed() { + decompressRandomSeed = std::random_device()(); + std::cout << "Refresh random seed to " << decompressRandomSeed << "\n"; +} + +#ifdef __GNUC__ +template +static std::vector::value, T>::type> utilTestRandomData(int32_t n, T min, + T max) { + std::mt19937 gen(decompressRandomSeed); + std::vector data(n); + + std::uniform_int_distribution dist(min, max); + for (auto& v : data) v = dist(gen); + return data; +} + +template +static std::vector::value, T>::type> utilTestRandomData(int32_t n, + T min, + T max) { + std::mt19937 gen(decompressRandomSeed); + std::vector data(n); + + std::uniform_real_distribution dist(min, max); + for (auto& v : data) v = dist(gen); + return data; +} +#else +template +static std::vector utilTestRandomData(int32_t n, T min, T max) { + std::vector data(n); + + for (int32_t i = 0; i < n; ++i) { + data[i] = (i & 0x1) ? min : max; + } + return data; +} +#endif + +template +static double measureRunTime(const F& func, int32_t nround = 1) { + auto start = std::chrono::high_resolution_clock::now(); + for (int32_t i = 0; i < nround; ++i) { + func(); + } + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + return duration / 1000.0; +} + +template +struct DataTypeSupportAvx { + static const bool value = false; +}; + +template <> +struct DataTypeSupportAvx { + static const bool value = true; +}; + +template <> +struct DataTypeSupportAvx { + static const bool value = true; +}; + +template +static void decompressBasicTest(size_t dataSize, const CompF& compress, const DecompF& decompress, T min, T max) { + auto origData = utilTestRandomData(dataSize, min, max); + std::vector compData(origData.size() * sizeof(origData[0]) + 1); + int32_t cnt = compress(origData.data(), origData.size(), origData.size(), compData.data(), compData.size(), + ONE_STAGE_COMP, nullptr, 0); + ASSERT_LE(cnt, compData.size()); + decltype(origData) decompData(origData.size()); + + // test simple implementation without SIMD instructions + tsSIMDEnable = 0; + cnt = decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(), + ONE_STAGE_COMP, nullptr, 0); + ASSERT_EQ(cnt, compData.size() - 1); + EXPECT_EQ(origData, decompData); + +#ifdef __AVX2__ + if (DataTypeSupportAvx::value) { + // test AVX2 implementation + tsSIMDEnable = 1; + tsAVX2Supported = 1; + cnt = decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(), + ONE_STAGE_COMP, nullptr, 0); + ASSERT_EQ(cnt, compData.size() - 1); + EXPECT_EQ(origData, decompData); + } +#endif +} + +template +static void decompressPerfTest(const char* typname, const CompF& compress, const DecompF& decompress, T min, T max) { + constexpr size_t DATA_SIZE = 1 * 1024 * 1024; + constexpr int32_t NROUND = 1000; + auto origData = utilTestRandomData(DATA_SIZE, min, max); + std::vector compData(origData.size() * sizeof(origData[0]) + 1); + int32_t cnt = compress(origData.data(), origData.size(), origData.size(), compData.data(), compData.size(), + ONE_STAGE_COMP, nullptr, 0); + ASSERT_LE(cnt, compData.size()); + if (compData[0] == 1) std::cout << "NOT COMPRESSED!\n"; + std::cout << "Original size: " << compData.size() - 1 << "; Compressed size: " << cnt + << "; Compression ratio: " << 1.0 * (compData.size() - 1) / cnt << "\n"; + decltype(origData) decompData(origData.size()); + + tsSIMDEnable = 0; + auto ms = measureRunTime( + [&]() { + decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(), + ONE_STAGE_COMP, nullptr, 0); + }, + NROUND); + std::cout << "Decompression of " << NROUND * DATA_SIZE << " " << typname << " without SIMD costs " << ms + << " ms, avg speed: " << NROUND * DATA_SIZE * 1000 / ms << " tuples/s\n"; + +#ifdef __AVX2__ + if (DataTypeSupportAvx::value) { + tsSIMDEnable = 1; + tsAVX2Supported = 1; + ms = measureRunTime( + [&]() { + decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(), + ONE_STAGE_COMP, nullptr, 0); + }, + NROUND); + std::cout << "Decompression of " << NROUND * DATA_SIZE << " " << typname << " using AVX2 costs " << ms + << " ms, avg speed: " << NROUND * DATA_SIZE * 1000 / ms << " tuples/s\n"; + } +#endif +} + +#define RUN_PERF_TEST(typname, comp, decomp, min, max) \ + do { \ + refreshSeed(); \ + decompressPerfTest(#typname, comp, decomp, min, max); \ + } while (0) + +TEST(utilTest, decompressTinyintBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressTinyint, tsDecompressTinyint, 0, 100); + } +} + +TEST(utilTest, decompressTinyintPerf) { RUN_PERF_TEST(int8_t, tsCompressTinyint, tsDecompressTinyint, 0, 100); } + +TEST(utilTest, decompressSmallintBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressSmallint, tsDecompressSmallint, 0, 10000); + } +} + +TEST(utilTest, decompressSmallintPerf) { RUN_PERF_TEST(int16_t, tsCompressSmallint, tsDecompressSmallint, 0, 10000); } + +TEST(utilTest, decompressIntBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressInt, tsDecompressInt, 0, 1000000); + } +} + +TEST(utilTest, decompressIntPerf) { RUN_PERF_TEST(int32_t, tsCompressInt, tsDecompressInt, 0, 1000000); } + +TEST(utilTest, decompressBigintBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressBigint, tsDecompressBigint, 0, 1000000000L); + } +} + +TEST(utilTest, decompressBigintPerf) { RUN_PERF_TEST(int64_t, tsCompressBigint, tsDecompressBigint, 0, 1000000000L); } + +TEST(utilTest, decompressFloatBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressFloat, tsDecompressFloat, 0, 99999); + } +} + +TEST(utilTest, decompressFloatPerf) { RUN_PERF_TEST(float, tsCompressFloat, tsDecompressFloat, 0, 99999); } + +TEST(utilTest, decompressDoubleBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressDouble, tsDecompressDouble, 0, 9999999999); + } +} + +TEST(utilTest, decompressDoublePerf) { RUN_PERF_TEST(double, tsCompressDouble, tsDecompressDouble, 0, 9999999999); } + + +TEST(utilTest, decompressTimestampBasic) { + refreshSeed(); + for (int32_t r = 1; r <= 4096; ++r) { + decompressBasicTest(r, tsCompressTimestamp, tsDecompressTimestamp, 0, 1000000000L); + } +} + +TEST(utilTest, decompressTimestampPerf) { + refreshSeed(); + decompressPerfTest("timestamp", tsCompressTimestamp, tsDecompressTimestamp, 0, 1000000000L); +}