enh(query)[TD-32652]: enable AVX implementation with CPU dispatching

Extract AVX implementation of specific functions into separate files.
The CPU dispatching mechanism will now check the machine's instruction
set at runtime to determine whether to use the AVX implementation or
fallback to the naive version.

This enhancement improves performance on most machines while ensuring
compatibility with older hardwares.
This commit is contained in:
Jinqing Kuang 2024-10-30 15:12:49 +08:00
parent c65f6c3e79
commit 647066c4f9
11 changed files with 434 additions and 374 deletions

View File

@ -177,48 +177,11 @@ ELSE()
SET(COMPILER_SUPPORT_AVX512VL false)
ELSE()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
INCLUDE(CheckCSourceRuns)
SET(CMAKE_REQUIRED_FLAGS "-mavx")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256d a, b, c;
double buf[4] = {0};
a = _mm256_loadu_pd(buf);
b = _mm256_loadu_pd(buf);
c = _mm256_add_pd(a, b);
_mm256_storeu_pd(buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
IF (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX)
SET(CMAKE_REQUIRED_FLAGS "-mavx2")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256i a, b, c;
int buf[8] = {0};
a = _mm256_loadu_si256((__m256i *)buf);
b = _mm256_loadu_si256((__m256i *)buf);
c = _mm256_and_si256(a, b);
_mm256_storeu_si256((__m256i *)buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
IF (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX2)
ENDIF()
IF(COMPILER_SUPPORT_SSE42)

View File

@ -152,15 +152,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
// for internal usage
int32_t getWordLength(char type);
#ifdef __AVX2__
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output);
int32_t tsDecompressDoubleImpAvx2(const char *input, int32_t nelements, char *output);
#endif
#ifdef __AVX512VL__
void tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian);
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
#endif
int32_t tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian);
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool bigEndian);
/*************************************************************************
* REGULAR COMPRESSION 2

View File

@ -41,6 +41,7 @@ extern const int32_t TYPE_BYTES[21];
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *)
#define M256_BYTES 32
#define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(TdUcs4)

View File

@ -1,6 +1,10 @@
aux_source_directory(src FUNCTION_SRC)
aux_source_directory(src/detail FUNCTION_SRC_DETAIL)
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
set_source_files_properties(src/detail/tminmaxavx.c PROPERTIES COMPILE_FLAGS -mavx2)
ENDIF()
add_library(function STATIC ${FUNCTION_SRC} ${FUNCTION_SRC_DETAIL})
target_include_directories(
function

View File

@ -25,6 +25,11 @@ extern "C" {
#include "functionResInfoInt.h"
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems);
int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res);
int32_t doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res);
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);

View File

@ -72,173 +72,6 @@
#define GET_INVOKE_INTRINSIC_THRESHOLD(_bits, _bytes) ((_bits) / ((_bytes) << 3u))
#ifdef __AVX2__
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
*width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width);
*rounds = numOfRows / (*width);
}
#define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec))
#define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec))
#define CMP_TYPE_MIN_MAX(type, cmp) \
const type* p = pData; \
__m256i initVal = _mm256_lddqu_si256((__m256i*)p); \
p += width; \
for (int32_t i = 1; i < (rounds); ++i) { \
__m256i next = _mm256_lddqu_si256((__m256i*)p); \
initVal = CMP_FUNC_##cmp##_##type(initVal, next); \
p += width; \
} \
const type* q = (const type*)&initVal; \
type* v = (type*)res; \
EXTRACT_##cmp##_VAL(q, p, width, remain, *v)
static void i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
const int8_t* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int8_t _mm256_min_epi8
#define CMP_FUNC_MAX_int8_t _mm256_max_epi8
#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8
#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MIN);
}
}
}
static void i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int16_t _mm256_min_epi16
#define CMP_FUNC_MAX_int16_t _mm256_max_epi16
#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16
#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MIN);
}
}
}
static void i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int32_t _mm256_min_epi32
#define CMP_FUNC_MAX_int32_t _mm256_max_epi32
#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32
#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MIN);
}
}
}
static void floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) {
const float* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width);
__m256 next;
__m256 initVal = _mm256_loadu_ps(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_max_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_min_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
}
static void doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) {
const double* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width);
__m256d next;
__m256d initVal = _mm256_loadu_pd(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_max_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_min_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
}
#endif
static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows, bool isStr) {
int32_t i = start;
@ -255,31 +88,31 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
pBuf->v = ((const int8_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= sizeof(__m256i)) {
i8VectorCmpAVX2(data + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int8_t* p = (const int8_t*)data;
int8_t* v = (int8_t*)&pBuf->v;
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= M256_BYTES) {
int32_t code = i8VectorCmpAVX2(((char*)data) + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (signVal) {
const int8_t* p = (const int8_t*)data;
int8_t* v = (int8_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
const uint8_t* p = (const uint8_t*)data;
uint8_t* v = (uint8_t*)&pBuf->v;
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
} else {
const uint8_t* p = (const uint8_t*)data;
uint8_t* v = (uint8_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
@ -292,31 +125,31 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = ((const int16_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= sizeof(__m256i)) {
i16VectorCmpAVX2(data + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int16_t* p = (const int16_t*)data;
int16_t* v = (int16_t*)&pBuf->v;
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= M256_BYTES) {
int32_t code = i16VectorCmpAVX2(((char*)data) + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (signVal) {
const int16_t* p = (const int16_t*)data;
int16_t* v = (int16_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
const uint16_t* p = (const uint16_t*)data;
uint16_t* v = (uint16_t*)&pBuf->v;
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
} else {
const uint16_t* p = (const uint16_t*)data;
uint16_t* v = (uint16_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
@ -329,31 +162,31 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = ((const int32_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= sizeof(__m256i)) {
i32VectorCmpAVX2(data + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int32_t* p = (const int32_t*)data;
int32_t* v = (int32_t*)&pBuf->v;
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= M256_BYTES) {
int32_t code = i32VectorCmpAVX2(((char*)data) + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (signVal) {
const int32_t* p = (const int32_t*)data;
int32_t* v = (int32_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
const uint32_t* p = (const uint32_t*)data;
uint32_t* v = (uint32_t*)&pBuf->v;
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
} else {
const uint32_t* p = (const uint32_t*)data;
uint32_t* v = (uint32_t*)&pBuf->v;
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
if (isMinFunc) {
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *v, p);
} else {
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
@ -397,20 +230,20 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
*val = pData[start];
}
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(float) >= sizeof(__m256i)) {
floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
#else
if (true) {
#endif
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(float) >= M256_BYTES) {
int32_t code = floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
pBuf->assign = true;
}
@ -422,20 +255,20 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
*val = pData[start];
}
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(double) >= sizeof(__m256i)) {
doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
#else
if (true) {
#endif
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(double) >= M256_BYTES) {
int32_t code = doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
pBuf->assign = true;
}

View File

@ -0,0 +1,227 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "builtinsimpl.h"
#ifdef __AVX2__
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
*width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width);
*rounds = numOfRows / (*width);
}
#define __COMPARE_EXTRACT_MIN(start, end, val, _data) \
for (int32_t i = (start); i < (end); ++i) { \
if ((val) > (_data)[i]) { \
(val) = (_data)[i]; \
} \
}
#define __COMPARE_EXTRACT_MAX(start, end, val, _data) \
for (int32_t i = (start); i < (end); ++i) { \
if ((val) < (_data)[i]) { \
(val) = (_data)[i]; \
} \
}
#define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec))
#define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec))
#define CMP_TYPE_MIN_MAX(type, cmp) \
const type* p = pData; \
__m256i initVal = _mm256_lddqu_si256((__m256i*)p); \
p += width; \
for (int32_t i = 1; i < (rounds); ++i) { \
__m256i next = _mm256_lddqu_si256((__m256i*)p); \
initVal = CMP_FUNC_##cmp##_##type(initVal, next); \
p += width; \
} \
const type* q = (const type*)&initVal; \
type* v = (type*)res; \
EXTRACT_##cmp##_VAL(q, p, width, remain, *v)
#endif
int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
const int8_t* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int8_t _mm256_min_epi8
#define CMP_FUNC_MAX_int8_t _mm256_max_epi8
#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8
#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int16_t _mm256_min_epi16
#define CMP_FUNC_MAX_int16_t _mm256_max_epi16
#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16
#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int32_t _mm256_min_epi32
#define CMP_FUNC_MAX_int32_t _mm256_max_epi32
#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32
#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) {
#ifdef __AVX2__
const float* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width);
__m256 next;
__m256 initVal = _mm256_loadu_ps(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_max_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_min_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) {
#ifdef __AVX2__
const double* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width);
__m256d next;
__m256d initVal = _mm256_loadu_pd(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_max_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_min_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}

View File

@ -37,7 +37,6 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
char *tsProcPath = NULL;
char tsSIMDEnable = 1;
char tsAVX512Enable = 0;
char tsSSE42Supported = 0;
char tsAVXSupported = 0;

View File

@ -1,5 +1,9 @@
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/src/version.c.in" "${CMAKE_CURRENT_SOURCE_DIR}/src/version.c")
aux_source_directory(src UTIL_SRC)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
set_source_files_properties(src/tdecompressavx.c PROPERTIES COMPILE_FLAGS -mavx2)
ENDIF()
add_library(util STATIC ${UTIL_SRC})
if(DEFINED GRANT_CFG_INCLUDE_DIR)

View File

@ -471,12 +471,12 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
return nelements * word_length;
}
#ifdef __AVX512F__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressIntImpl_Hw(input, nelements, output, type);
return nelements * word_length;
int32_t cnt = tsDecompressIntImpl_Hw(input, nelements, output, type);
if (cnt >= 0) {
return cnt;
}
}
#endif
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
@ -867,12 +867,12 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
memcpy(output, input + 1, nelements * longBytes);
return nelements * longBytes;
} else if (input[0] == 1) { // Decompress
#ifdef __AVX512VL__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
return nelements * longBytes;
int32_t cnt = tsDecompressTimestampAvx512(input, nelements, output, false);
if (cnt >= 0) {
return cnt;
}
}
#endif
int64_t *ostream = (int64_t *)output;
@ -1103,13 +1103,14 @@ int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int
return nelements * DOUBLE_BYTES;
}
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressDoubleImpAvx2(input + 1, nelements, output);
int32_t cnt = tsDecompressDoubleImpAvx2(input + 1, nelements, output);
if (cnt >= 0) {
return cnt;
}
}
#endif
// use implementation without SIMD instructions by default
return tsDecompressDoubleImpHelper(input + 1, nelements, output);
@ -1257,13 +1258,14 @@ int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int3
return nelements * FLOAT_BYTES;
}
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressFloatImpAvx2(input + 1, nelements, output);
int32_t cnt = tsDecompressFloatImpAvx2(input + 1, nelements, output);
if (cnt >= 0) {
return cnt;
}
}
#endif
// use implementation without SIMD instructions by default
return tsDecompressFloatImpHelper(input + 1, nelements, output);
@ -1883,3 +1885,26 @@ int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, u
return update;
}
int32_t getWordLength(char type) {
int32_t wordLength = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
wordLength = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
wordLength = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
wordLength = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
wordLength = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return TSDB_CODE_INVALID_PARA;
}
return wordLength;
}

View File

@ -13,35 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcompression.h"
#include "ttypes.h"
int32_t getWordLength(char type) {
int32_t wordLength = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
wordLength = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
wordLength = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
wordLength = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
wordLength = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return TSDB_CODE_INVALID_PARA;
}
return wordLength;
}
#ifdef __AVX2__
char tsSIMDEnable = 1;
#else
char tsSIMDEnable = 0;
#endif
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
#ifdef __AVX2__
int32_t word_length = getWordLength(type);
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@ -75,12 +56,12 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int32_t batch = 0;
int32_t remain = 0;
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
batch = num >> 3;
remain = num & 0x07;
#endif
} else if (tsSIMDEnable && tsAVX2Supported) {
#if __AVX2__
#ifdef __AVX2__
batch = num >> 2;
remain = num & 0x03;
#endif
@ -88,7 +69,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
for (int32_t i = 0; i < batch; ++i) {
__m512i prev = _mm512_set1_epi64(prevValue);
_mm512_storeu_si512((__m512i *)&p[_pos], prev);
@ -117,7 +98,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
}
} else {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
__m512i sum_mask1 = _mm512_set_epi64(6, 6, 4, 4, 2, 2, 0, 0);
__m512i sum_mask2 = _mm512_set_epi64(5, 5, 5, 5, 1, 1, 1, 1);
__m512i sum_mask3 = _mm512_set_epi64(3, 3, 3, 3, 3, 3, 3, 3);
@ -310,10 +291,13 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
}
return nelements * word_length;
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
#define M256_BYTES sizeof(__m256i)
#ifdef __AVX2__
FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
@ -332,7 +316,27 @@ FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) {
return diffVec;
}
FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi64x(7);
__m256i lopart = _mm256_set_epi64x(0, -1, 0, -1);
__m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi64(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1));
__m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
}
#endif
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output) {
#ifdef __AVX2__
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
@ -343,7 +347,7 @@ int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *out
// Load data into the buffer for batch processing
int32_t batchSize = M256_BYTES / FLOAT_BYTES;
int32_t idx = 0;
int32_t idx = 0;
uint32_t cur = 0;
for (int32_t i = 0; i < nelements; i += 2) {
if (idx == batchSize) {
@ -380,27 +384,14 @@ int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *out
out += idx * FLOAT_BYTES;
}
return (int32_t)(out - output);
}
FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi64x(7);
__m256i lopart = _mm256_set_epi64x(0, -1, 0, -1);
__m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi64(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1));
__m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, char *const output) {
#ifdef __AVX2__
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
@ -448,12 +439,15 @@ int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, ch
out += idx * DOUBLE_BYTES;
}
return (int32_t)(out - output);
}
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
#if __AVX512VL__
// decode two timestamps in one loop.
void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian) {
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output,
bool bigEndian) {
#ifdef __AVX512VL__
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
@ -588,11 +582,16 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements,
ostream[opos++] = prevVal[1] + prevDeltaX;
}
}
return;
return opos;
#else
uError("unable run %s without avx512 instructions", __func__);
return -1;
#endif
}
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool UNUSED_PARAM(bigEndian)) {
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool UNUSED_PARAM(bigEndian)) {
#ifdef __AVX512VL__
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
@ -700,6 +699,9 @@ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelement
}
}
return;
}
return opos;
#else
uError("unable run %s without avx512 instructions", __func__);
return -1;
#endif
}