enh(query)[TS-4661]. Add AVX2 support for decompression and MIN/MAX comparison

- Implement AVX2 instructions to decompress float and double data types
- Add AVX2-based MIN/MAX comparison for both integer and floating-point data types
- Include unit tests to verify functionality and performance improvements
This commit is contained in:
Jinqing Kuang 2024-09-11 14:30:34 +08:00
parent 3860ddc83d
commit 636c65d62f
11 changed files with 757 additions and 806 deletions

View File

@ -177,16 +177,17 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
ENDIF()
IF (COMPILER_SUPPORT_AVX)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
ENDIF()
IF (COMPILER_SUPPORT_AVX2)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
MESSAGE(STATUS "FMA instructions is ACTIVATED")
ENDIF()
IF (COMPILER_SUPPORT_AVX)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
ENDIF()
IF (COMPILER_SUPPORT_AVX2)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED")
IF ("${SIMD_AVX512_SUPPORT}" MATCHES "true")
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)

View File

@ -152,11 +152,15 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
// for internal usage
int32_t getWordLength(char type);
#ifdef __AVX2__
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
void tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
void tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output);
int32_t tsDecompressDoubleImpAvx2(const char *input, int32_t nelements, char *output);
void tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian);
#endif
#ifdef __AVX512VL__
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
#endif
/*************************************************************************
* REGULAR COMPRESSION 2
@ -213,8 +217,8 @@ typedef int32_t (*__data_compress_init)(char *lossyColumns, float fPrecision, do
uint32_t intervals, int32_t ifAdtFse, const char *compressor);
typedef int32_t (*__data_compress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, const int32_t nelements, char *const output,
const char type);
typedef int32_t (*__data_decompress_l1_fn_t)(const char *const input, int32_t ninput, const int32_t nelements,
char *const output, const char type);
typedef int32_t (*__data_compress_l2_fn_t)(const char *const input, const int32_t nelements, char *const output,
int32_t outputSize, const char type, int8_t level);

View File

@ -645,11 +645,6 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableCoreFile", tsEnableCoreFile, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "ssd42", tsSSE42Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx", tsAVXSupported, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx2", tsAVX2Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "fma", tsFMASupported, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "avx512", tsAVX512Supported, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "AVX512Enable", tsAVX512Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE));
@ -1397,6 +1392,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable");
tsSIMDEnable = (bool)pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "AVX512Enable");
tsAVX512Enable = (bool)pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tagFilterCache");
tsTagFilterCache = (bool)pItem->bval;

View File

@ -92,254 +92,6 @@
out->sum.usum += val; \
}
static void floatVectorSumAVX(const float* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(float);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
const float* p = plist;
__m256 val;
__m256 sum = _mm256_setzero_ps();
for (int32_t i = 0; i < rounds; ++i) {
val = _mm256_loadu_ps(p);
sum = _mm256_add_ps(sum, val);
p += width;
}
// let sum up the final results
const float* q = (const float*)&sum;
pRes->sum.dsum += q[0] + q[1] + q[2] + q[3] + q[4] + q[5] + q[6] + q[7];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.dsum += plist[j + startIndex];
}
#endif
}
static void doubleVectorSumAVX(const double* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
const double* p = plist;
__m256d val;
__m256d sum = _mm256_setzero_pd();
for (int32_t i = 0; i < rounds; ++i) {
val = _mm256_loadu_pd(p);
sum = _mm256_add_pd(sum, val);
p += width;
}
// let sum up the final results
const double* q = (const double*)&sum;
pRes->sum.dsum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.dsum += plist[j + startIndex];
}
#endif
}
static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_TINYINT) {
const int8_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepi8_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint8_t* p = (const uint8_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu8_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint8_t)plist[j + rounds * width];
}
}
#endif
}
static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_SMALLINT) {
const int16_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepi16_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint16_t* p = (const uint16_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu16_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint16_t)plist[j + rounds * width];
}
}
#endif
}
static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t type, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth>>3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256();
if (type == TSDB_DATA_TYPE_INT) {
const int32_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepi32_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint32_t* p = (const uint32_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
__m256i extVal = _mm256_cvtepu32_epi64(val); // only four items will be converted into __m256i
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint32_t)plist[j + rounds * width];
}
}
#endif
}
static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* pRes) {
const int32_t bitWidth = 256;
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t width = (bitWidth >> 3u) / sizeof(int64_t);
int32_t remainder = numOfRows % width;
int32_t rounds = numOfRows / width;
__m256i sum = _mm256_setzero_si256();
const int64_t* p = plist;
for (int32_t i = 0; i < rounds; ++i) {
__m256i val = _mm256_lddqu_si256((__m256i*)p);
sum = _mm256_add_epi64(sum, val);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
#endif
}
int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); }
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
@ -561,23 +313,16 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
numOfElem = pInput->numOfRows;
pAvgRes->count += pInput->numOfRows;
bool simdAvailable = tsAVXSupported && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
switch(type) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
const int8_t* plist = (const int8_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i8VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_TINYINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint8_t)plist[i])
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_TINYINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint8_t)plist[i])
}
}
break;
@ -587,16 +332,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_SMALLINT: {
const int16_t* plist = (const int16_t*)pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i16VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_SMALLINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint16_t)plist[i])
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_SMALLINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint16_t)plist[i])
}
}
break;
@ -606,16 +346,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_INT: {
const int32_t* plist = (const int32_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
i32VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_INT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint32_t)plist[i])
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_INT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint32_t)plist[i])
}
}
break;
@ -625,16 +360,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_BIGINT: {
const int64_t* plist = (const int64_t*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_BIGINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint64_t)plist[i])
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
if (type == TSDB_DATA_TYPE_BIGINT) {
CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i])
} else {
CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint64_t)plist[i])
}
}
break;
@ -643,26 +373,16 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_FLOAT: {
const float* plist = (const float*) pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
floatVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
const double* plist = (const double*)pCol->pData;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if (simdAvailable) {
doubleVectorSumAVX(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.dsum += plist[i];
}
break;
}

View File

@ -72,6 +72,7 @@
#define GET_INVOKE_INTRINSIC_THRESHOLD(_bits, _bytes) ((_bits) / ((_bytes) << 3u))
#ifdef __AVX2__
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
@ -81,224 +82,104 @@ static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder
}
#define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \
(_v) = TMAX((_first)[0], (_first)[1]); \
for (int32_t k = 1; k < (_width); ++k) { \
(_v) = TMAX((_v), (_first)[k]); \
} \
\
for (int32_t j = 0; j < (_remain); ++j) { \
if ((_v) < (_sec)[j]) { \
(_v) = (_sec)[j]; \
} \
}
__COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec))
#define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \
(_v) = TMIN((_first)[0], (_first)[1]); \
for (int32_t k = 1; k < (_width); ++k) { \
(_v) = TMIN((_v), (_first)[k]); \
} \
\
for (int32_t j = 0; j < (_remain); ++j) { \
if ((_v) > (_sec)[j]) { \
(_v) = (_sec)[j]; \
} \
}
__COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec))
static int8_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal) {
int8_t v = 0;
#define CMP_TYPE_MIN_MAX(type, cmp) \
const type* p = pData; \
__m256i initVal = _mm256_lddqu_si256((__m256i*)p); \
p += width; \
for (int32_t i = 1; i < (rounds); ++i) { \
__m256i next = _mm256_lddqu_si256((__m256i*)p); \
initVal = CMP_FUNC_##cmp##_##type(initVal, next); \
p += width; \
} \
const type* q = (const type*)&initVal; \
type* v = (type*)res; \
EXTRACT_##cmp##_VAL(q, p, width, remain, *v)
static void i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
const int8_t* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width);
#if __AVX2__
__m256i next;
__m256i initVal = _mm256_lddqu_si256((__m256i*)p);
p += width;
#define CMP_FUNC_MIN_int8_t _mm256_min_epi8
#define CMP_FUNC_MAX_int8_t _mm256_max_epi8
#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8
#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8
if (!isMinFunc) { // max function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi8(initVal, next);
p += width;
}
const int8_t* q = (const int8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epu8(initVal, next);
p += width;
}
const uint8_t* q = (const uint8_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int8_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MAX);
}
} else { // min function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epi8(initVal, next);
p += width;
}
// let sum up the final results
const int8_t* q = (const int8_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int8_t, MIN);
} else {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epu8(initVal, next);
p += width;
}
// let sum up the final results
const uint8_t* q = (const uint8_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(uint8_t, MIN);
}
}
#endif
return v;
}
static int16_t i16VectorCmpAVX2(const int16_t* pData, int32_t numOfRows, bool isMinFunc, bool signVal) {
int16_t v = 0;
const int16_t* p = pData;
static void i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width);
#if __AVX2__
__m256i next;
__m256i initVal = _mm256_lddqu_si256((__m256i*)p);
p += width;
#define CMP_FUNC_MIN_int16_t _mm256_min_epi16
#define CMP_FUNC_MAX_int16_t _mm256_max_epi16
#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16
#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16
if (!isMinFunc) { // max function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi16(initVal, next);
p += width;
}
// let sum up the final results
const int16_t* q = (const int16_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int16_t, MAX);
} else {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epu16(initVal, next);
p += width;
}
// let sum up the final results
const uint16_t* q = (const uint16_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(uint16_t, MAX);
}
} else { // min function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epi16(initVal, next);
p += width;
}
// let sum up the final results
const int16_t* q = (const int16_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int16_t, MIN);
} else {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epi16(initVal, next);
p += width;
}
// let sum up the final results
const uint16_t* q = (const uint16_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(uint16_t, MIN);
}
}
#endif
return v;
}
static int32_t i32VectorCmpAVX2(const int32_t* pData, int32_t numOfRows, bool isMinFunc, bool signVal) {
int32_t v = 0;
const int32_t* p = pData;
static void i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width);
#if __AVX2__
__m256i next;
__m256i initVal = _mm256_lddqu_si256((__m256i*)p);
p += width;
#define CMP_FUNC_MIN_int32_t _mm256_min_epi32
#define CMP_FUNC_MAX_int32_t _mm256_max_epi32
#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32
#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32
if (!isMinFunc) { // max function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi32(initVal, next);
p += width;
}
// let compare the final results
const int32_t* q = (const int32_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
} else { // unsigned value
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_max_epi32(initVal, next);
p += width;
}
// let compare the final results
const uint32_t* q = (const uint32_t*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int32_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MAX);
}
} else { // min function
if (signVal) {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epi32(initVal, next);
p += width;
}
// let sum up the final results
const int32_t* q = (const int32_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(int32_t, MIN);
} else {
for (int32_t i = 0; i < rounds; ++i) {
next = _mm256_lddqu_si256((__m256i*)p);
initVal = _mm256_min_epu32(initVal, next);
p += width;
}
// let sum up the final results
const uint32_t* q = (const uint32_t*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
CMP_TYPE_MIN_MAX(uint32_t, MIN);
}
}
#endif
return v;
}
static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMinFunc) {
float v = 0;
static void floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) {
const float* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width);
#if __AVX__
__m256 next;
__m256 initVal = _mm256_loadu_ps(p);
p += width;
@ -311,7 +192,7 @@ static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMin
}
const float* q = (const float*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
@ -320,22 +201,16 @@ static float floatVectorCmpAVX(const float* pData, int32_t numOfRows, bool isMin
}
const float* q = (const float*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
#endif
return v;
}
static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool isMinFunc) {
double v = 0;
static void doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) {
const double* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width);
#if __AVX__
__m256d next;
__m256d initVal = _mm256_loadu_pd(p);
p += width;
@ -349,7 +224,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, v)
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
@ -359,12 +234,10 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, v)
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
#endif
return v;
}
#endif
static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows, bool isStr) {
int32_t i = start;
@ -378,14 +251,17 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start,
static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Supported && tsSIMDEnable) {
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int8_t*)data)[start];
}
if (!pBuf->assign) {
pBuf->v = ((const int8_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= sizeof(__m256i)) {
i8VectorCmpAVX2(data + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int8_t* p = (const int8_t*)data;
int8_t* v = (int8_t*)&pBuf->v;
@ -412,14 +288,17 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Supported && tsSIMDEnable) {
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int16_t*)data)[start];
}
if (!pBuf->assign) {
pBuf->v = ((const int16_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= sizeof(__m256i)) {
i16VectorCmpAVX2(data + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int16_t* p = (const int16_t*)data;
int16_t* v = (int16_t*)&pBuf->v;
@ -446,14 +325,17 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Supported && tsSIMDEnable) {
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int32_t*)data)[start];
}
if (!pBuf->assign) {
pBuf->v = ((const int32_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= sizeof(__m256i)) {
i32VectorCmpAVX2(data + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (signVal) {
const int32_t* p = (const int32_t*)data;
int32_t* v = (int32_t*)&pBuf->v;
@ -481,7 +363,7 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
if (!pBuf->assign) {
pBuf->v = ((int64_t*)data)[start];
pBuf->v = ((const int64_t*)data)[start];
}
if (signVal) {
@ -503,33 +385,29 @@ static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, S
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
pBuf->assign = true;
}
static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf,
bool isMinFunc) {
float* pData = (float*)pCol->pData;
float* val = (float*)&pBuf->v;
if (!pBuf->assign) {
*val = pData[start];
}
// AVX version to speedup the loop
if (tsAVXSupported && tsSIMDEnable) {
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(float) >= sizeof(__m256i)) {
floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
if (!pBuf->assign) {
*val = pData[start];
}
#else
if (true) {
#endif
if (isMinFunc) { // min
for (int32_t i = start; i < start + numOfRows; ++i) {
if (*val > pData[i]) {
*val = pData[i];
}
}
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
for (int32_t i = start; i < start + numOfRows; ++i) {
if (*val < pData[i]) {
*val = pData[i];
}
}
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
}
@ -540,27 +418,21 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
bool isMinFunc) {
double* pData = (double*)pCol->pData;
double* val = (double*)&pBuf->v;
if (!pBuf->assign) {
*val = pData[start];
}
// AVX version to speedup the loop
if (tsAVXSupported && tsSIMDEnable) {
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(double) >= sizeof(__m256i)) {
doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
if (!pBuf->assign) {
*val = pData[start];
}
#else
if (true) {
#endif
if (isMinFunc) { // min
for (int32_t i = start; i < start + numOfRows; ++i) {
if (*val > pData[i]) {
*val = pData[i];
}
}
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
for (int32_t i = start; i < start + numOfRows; ++i) {
if (*val < pData[i]) {
*val = pData[i];
}
}
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
}
@ -581,7 +453,7 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
}
static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunctionCtx* pCtx, SMinmaxResInfo* pBuf,
bool isMinFunc) {
bool isMinFunc) {
if (isMinFunc) {
switch (pCol->info.type) {
case TSDB_DATA_TYPE_BOOL:
@ -652,8 +524,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu
if (colDataIsNull_var(pCol, i)) {
continue;
}
char *pLeft = (char *)colDataGetData(pCol, i);
char *pRight = (char *)pBuf->str;
char* pLeft = (char*)colDataGetData(pCol, i);
char* pRight = (char*)pBuf->str;
int32_t ret = compareLenBinaryVal(pLeft, pRight);
if (ret < 0) {
@ -674,8 +546,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu
if (colDataIsNull_var(pCol, i)) {
continue;
}
char *pLeft = (char *)colDataGetData(pCol, i);
char *pRight = (char *)pBuf->str;
char* pLeft = (char*)colDataGetData(pCol, i);
char* pRight = (char*)pBuf->str;
int32_t ret = compareLenPrefixedWStr(pLeft, pRight);
if (ret < 0) {
@ -761,8 +633,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu
if (colDataIsNull_var(pCol, i)) {
continue;
}
char *pLeft = (char *)colDataGetData(pCol, i);
char *pRight = (char *)pBuf->str;
char* pLeft = (char*)colDataGetData(pCol, i);
char* pRight = (char*)pBuf->str;
int32_t ret = compareLenBinaryVal(pLeft, pRight);
if (ret > 0) {
@ -784,8 +656,8 @@ static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFu
if (colDataIsNull_var(pCol, i)) {
continue;
}
char *pLeft = (char *)colDataGetData(pCol, i);
char *pRight = (char *)pBuf->str;
char* pLeft = (char*)colDataGetData(pCol, i);
char* pRight = (char*)pBuf->str;
int32_t ret = compareLenPrefixedWStr(pLeft, pRight);
if (ret > 0) {
@ -838,7 +710,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
// data in current data block are qualified to the query
if (pInput->colDataSMAIsSet && !IS_STR_DATA_TYPE(type)) {
numOfElems = pInput->numOfRows - pAgg->numOfNull;
if (numOfElems == 0) {
goto _over;

View File

@ -37,7 +37,7 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
char *tsProcPath = NULL;
char tsSIMDEnable = 0;
char tsSIMDEnable = 1;
char tsAVX512Enable = 0;
char tsSSE42Supported = 0;
char tsAVXSupported = 0;

View File

@ -168,7 +168,7 @@ void startTrace() {
Dwarf_Ptr errarg = 0;
FILE *fp = fopen("/proc/self/maps", "r");
fscanf(fp, "%lx-", &addr);
ret = fscanf(fp, "%lx-", &addr);
fclose(fp);
ret = dwarf_init_path("/proc/self/exe", NULL, 0, DW_GROUPNUMBER_ANY, NULL, errarg, &tDbg, NULL);

View File

@ -64,29 +64,33 @@
#include "td_sz.h"
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type);
// delta
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output,
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type);
// simple8b
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type);
// bit
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
char const type);
// double specail
int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type);
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
char const type);
int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output);
int32_t l2ComressInitImpl_disabled(char *lossyColumns, float fPrecision, double dPrecision, uint32_t maxIntervals,
uint32_t intervals, int32_t ifAdtFse, const char *compressor) {
@ -457,8 +461,8 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t word_length = getWordLength(type);
if (word_length == -1) {
return word_length;
if (word_length < 0) {
return -1;
}
// If not compressed.
@ -467,70 +471,106 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
return nelements * word_length;
}
#if __AVX2__
tsDecompressIntImpl_Hw(input, nelements, output, type);
return nelements * word_length;
#else
#ifdef __AVX512F__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressIntImpl_Hw(input, nelements, output, type);
return nelements * word_length;
}
#endif
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
const char *ip = input + 1;
char *op = output;
int32_t count = 0;
int32_t _pos = 0;
int64_t prev_value = 0;
while (1) {
if (count == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
while (count < nelements) {
uint64_t w = *(uint64_t *)ip;
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
int32_t elems = selector_to_elems[(int32_t)selector];
for (int32_t i = 0; i < elems; i++) {
uint64_t zigzag_value;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
} else {
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
int64_t *out = (int64_t *)op;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
*out = prev_value;
}
} else {
uint64_t zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
*out = prev_value;
}
}
op = (char *)out;
break;
}
int64_t diff = ZIGZAG_DECODE(int64_t, zigzag_value);
int64_t curr_value = diff + prev_value;
prev_value = curr_value;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)output + _pos) = (int64_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)output + _pos) = (int32_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)output + _pos) = (int16_t)curr_value;
_pos++;
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)output + _pos) = (int8_t)curr_value;
_pos++;
break;
default:
perror("Wrong integer types.\n");
return -1;
case TSDB_DATA_TYPE_INT: {
int32_t *out = (int32_t *)op;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
*out = (int32_t)prev_value;
}
} else {
uint64_t zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
*out = (int32_t)prev_value;
}
}
op = (char *)out;
break;
}
count++;
if (count == nelements) break;
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *out = (int16_t *)op;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
*out = (int16_t)prev_value;
}
} else {
uint64_t zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
*out = (int16_t)prev_value;
}
}
op = (char *)out;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *out = (int8_t *)op;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
*out = (int8_t)prev_value;
}
} else {
uint64_t zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; ++i, ++count, ++out) {
zigzag_value = ((w >> (4 + bit * i)) & INT64MASK(bit));
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
*out = (int8_t)prev_value;
}
}
op = (char *)out;
break;
}
default:
perror("Wrong integer types.\n");
return -1;
}
ip += LONG_BYTES;
}
return nelements * word_length;
#endif
}
/* ----------------------------------------------Bool Compression ---------------------------------------------- */
@ -590,7 +630,8 @@ int32_t tsDecompressBoolImp(const char *const input, const int32_t nelements, ch
int32_t tsCompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
return tsCompressBoolImp(input, nelements, output);
}
int32_t tsDecompressBoolImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
int32_t tsDecompressBoolImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
char const type) {
return tsDecompressBoolImp(input, nelements, output);
}
@ -602,18 +643,20 @@ int32_t tsCompressDoubleImp2(const char *const input, const int32_t nelements, c
}
return TSDB_CODE_THIRDPARTY_ERROR;
}
int32_t tsDecompressDoubleImp2(const char *const input, const int32_t nelements, char *const output, char const type) {
int32_t tsDecompressDoubleImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
char const type) {
if (type == TSDB_DATA_TYPE_FLOAT) {
return tsDecompressFloatImp(input, nelements, output);
return tsDecompressFloatImp(input, ninput, nelements, output);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
return tsDecompressDoubleImp(input, nelements, output);
return tsDecompressDoubleImp(input, ninput, nelements, output);
}
return TSDB_CODE_THIRDPARTY_ERROR;
}
int32_t tsCompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsCompressINTImp(input, nelements, output, type);
}
int32_t tsDecompressINTImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t tsDecompressINTImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type) {
return tsDecompressINTImp(input, nelements, output, type);
}
@ -824,67 +867,68 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
memcpy(output, input + 1, nelements * longBytes);
return nelements * longBytes;
} else if (input[0] == 1) { // Decompress
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
tsDecompressTimestampAvx512(input, nelements, output, false);
} else if (tsSIMDEnable && tsAVX2Supported) {
tsDecompressTimestampAvx2(input, nelements, output, false);
} else {
int64_t *ostream = (int64_t *)output;
#ifdef __AVX512VL__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
return nelements * longBytes;
}
#endif
int32_t ipos = 1, opos = 0;
int8_t nbytes = 0;
int64_t prev_value = 0;
int64_t prev_delta = 0;
int64_t delta_of_delta = 0;
int64_t *ostream = (int64_t *)output;
while (1) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4);
if (nbytes == 0) {
delta_of_delta = 0;
int32_t ipos = 1, opos = 0;
int8_t nbytes = 0;
int64_t prev_value = 0;
int64_t prev_delta = 0;
int64_t delta_of_delta = 0;
while (1) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4);
if (nbytes == 0) {
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
memcpy(&dd1, input + ipos, nbytes);
}
delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
}
ipos += nbytes;
if (opos == 0) {
prev_value = delta_of_delta;
prev_delta = 0;
ostream[opos++] = delta_of_delta;
} else {
prev_delta = delta_of_delta + prev_delta;
prev_value = prev_value + prev_delta;
ostream[opos++] = prev_value;
}
if (opos == nelements) return nelements * longBytes;
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
ipos += nbytes;
if (opos == 0) {
prev_value = delta_of_delta;
prev_delta = 0;
ostream[opos++] = delta_of_delta;
} else {
prev_delta = delta_of_delta + prev_delta;
prev_value = prev_value + prev_delta;
ostream[opos++] = prev_value;
if (opos == nelements) return nelements * longBytes;
}
if (opos == nelements) return nelements * longBytes;
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
prev_delta = delta_of_delta + prev_delta;
prev_value = prev_value + prev_delta;
ostream[opos++] = prev_value;
if (opos == nelements) return nelements * longBytes;
}
}
@ -897,7 +941,8 @@ int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char
memcpy(output + 1, input, bytes);
return bytes + 1;
}
int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t tsDecompressPlain2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type) {
int32_t bytes = tDataTypes[type].bytes * nelements;
memcpy(output, input + 1, bytes);
return bytes;
@ -905,7 +950,7 @@ int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, cha
int32_t tsCompressTimestampImp2(const char *const input, const int32_t nelements, char *const output, const char type) {
return tsCompressTimestampImp(input, nelements, output);
}
int32_t tsDecompressTimestampImp2(const char *const input, const int32_t nelements, char *const output,
int32_t tsDecompressTimestampImp2(const char *const input, int32_t ninput, const int32_t nelements, char *const output,
const char type) {
return tsDecompressTimestampImp(input, nelements, output);
}
@ -1023,17 +1068,10 @@ FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const
return diff;
}
int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, char *const output) {
// output stream
double *ostream = (double *)output;
if (input[0] == 1) {
memcpy(output, input + 1, nelements * DOUBLE_BYTES);
return nelements * DOUBLE_BYTES;
}
static int32_t tsDecompressDoubleImpHelper(const char *input, int32_t nelements, char *output) {
double *ostream = (double *)output;
uint8_t flags = 0;
int32_t ipos = 1;
int32_t ipos = 0;
int32_t opos = 0;
uint64_t diff = 0;
union {
@ -1048,7 +1086,7 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements,
flags = input[ipos++];
}
diff = decodeDoubleValue(input, &ipos, flags & 0x0f);
diff = decodeDoubleValue(input, &ipos, flags & INT8MASK(4));
flags >>= 4;
curr.bits ^= diff;
@ -1058,6 +1096,25 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements,
return nelements * DOUBLE_BYTES;
}
int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
// return the result directly if there is no compression
if (input[0] == 1) {
memcpy(output, input + 1, nelements * DOUBLE_BYTES);
return nelements * DOUBLE_BYTES;
}
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressDoubleImpAvx2(input + 1, nelements, output);
}
#endif
// use implementation without SIMD instructions by default
return tsDecompressDoubleImpHelper(input + 1, nelements, output);
}
/* --------------------------------------------Float Compression ---------------------------------------------- */
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
uint8_t nbytes = (flag & INT8MASK(3)) + 1;
@ -1166,49 +1223,50 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t
return diff;
}
static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float *ostream) {
static int32_t tsDecompressFloatImpHelper(const char *input, int32_t nelements, char *output) {
float *ostream = (float *)output;
uint8_t flags = 0;
int32_t ipos = 1;
int32_t ipos = 0;
int32_t opos = 0;
uint32_t prev_value = 0;
uint32_t diff = 0;
union {
uint32_t bits;
float real;
} curr;
curr.bits = 0;
for (int32_t i = 0; i < nelements; i++) {
if (i % 2 == 0) {
flags = input[ipos++];
}
uint8_t flag = flags & INT8MASK(4);
diff = decodeFloatValue(input, &ipos, flags & INT8MASK(4));
flags >>= 4;
uint32_t diff = decodeFloatValue(input, &ipos, flag);
union {
uint32_t bits;
float real;
} curr;
uint32_t predicted = prev_value;
curr.bits = predicted ^ diff;
prev_value = curr.bits;
curr.bits ^= diff;
ostream[opos++] = curr.real;
}
return nelements * FLOAT_BYTES;
}
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int32_t nelements, char *const output) {
if (input[0] == 1) {
memcpy(output, input + 1, nelements * FLOAT_BYTES);
return nelements * FLOAT_BYTES;
}
if (tsSIMDEnable && tsAVX2Supported) {
tsDecompressFloatImplAvx2(input, nelements, output);
} else if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
tsDecompressFloatImplAvx512(input, nelements, output);
} else { // alternative implementation without SIMD instructions.
tsDecompressFloatHelper(input, nelements, (float *)output);
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressFloatImpAvx2(input + 1, nelements, output);
}
#endif
return nelements * FLOAT_BYTES;
// use implementation without SIMD instructions by default
return tsDecompressFloatImpHelper(input + 1, nelements, output);
}
//
@ -1336,10 +1394,11 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
} else {
// decompress lossless
if (cmprAlg == ONE_STAGE_COMP) {
return tsDecompressFloatImp(pIn, nEle, pOut);
return tsDecompressFloatImp(pIn, nIn, nEle, pOut);
} else if (cmprAlg == TWO_STAGE_COMP) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressFloatImp(pBuf, nEle, pOut);
int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
if (bufLen < 0) return -1;
return tsDecompressFloatImp(pBuf, bufLen, nEle, pOut);
} else {
return TSDB_CODE_INVALID_PARA;
}
@ -1373,10 +1432,11 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
} else {
// decompress lossless
if (cmprAlg == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(pIn, nEle, pOut);
return tsDecompressDoubleImp(pIn, nIn, nEle, pOut);
} else if (cmprAlg == TWO_STAGE_COMP) {
if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1;
return tsDecompressDoubleImp(pBuf, nEle, pOut);
int32_t bufLen = tsDecompressStringImp(pIn, nIn, pBuf, nBuf);
if (bufLen < 0) return -1;
return tsDecompressDoubleImp(pBuf, bufLen, nEle, pOut);
} else {
return TSDB_CODE_INVALID_PARA;
}
@ -1550,7 +1610,7 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
} else { \
uTrace("dencode:%s, compress:%s, level:%s, type:%s", compressL1Dict[l1].name, "disabled", "disabled", \
tDataTypes[type].name); \
return compressL1Dict[l1].decomprFn(pIn, nEle, pOut, type); \
return compressL1Dict[l1].decomprFn(pIn, nIn, nEle, pOut, type); \
} \
} else if (l1 != L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
@ -1562,8 +1622,9 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
} else { \
uTrace("dencode:%s, decompress:%s, level:%d, type:%s", compressL1Dict[l1].name, compressL2Dict[l2].name, lvl, \
tDataTypes[type].name); \
if (compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type) < 0) return -1; \
return compressL1Dict[l1].decomprFn(pBuf, nEle, pOut, type); \
int32_t bufLen = compressL2Dict[l2].decomprFn(pIn, nIn, pBuf, nBuf, type); \
if (bufLen < 0) return -1; \
return compressL1Dict[l1].decomprFn(pBuf, bufLen, nEle, pOut, type); \
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \

View File

@ -40,6 +40,7 @@ int32_t getWordLength(char type) {
return wordLength;
}
#ifdef __AVX2__
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t word_length = getWordLength(type);
@ -52,7 +53,6 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int32_t _pos = 0;
int64_t prevValue = 0;
#if __AVX2__ || __AVX512F__
while (_pos < nelements) {
uint64_t w = *(uint64_t *)ip;
@ -309,22 +309,145 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
ip += LONG_BYTES;
}
#endif
return nelements * word_length;
}
void tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) {
#if __AVX512F__
// todo add it
#endif
return;
#define M256_BYTES sizeof(__m256i)
FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi32(7);
__m256i lopart = _mm256_set_epi32(0, -1, 0, -1, 0, -1, 0, -1);
__m256i hipart = _mm256_set_epi32(-1, 0, -1, 0, -1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi32(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi32(-1));
__m256i shiftVec = _mm256_slli_epi32(_mm256_sub_epi32(_mm256_set1_epi32(3), _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi32(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi32(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi32(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
}
// todo add later
void tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) {
#if __AVX2__
#endif
return;
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output) {
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
char *data = (char *)ALIGN_NUM((uint64_t)buf, M256_BYTES);
char *flag = data + M256_BYTES;
const char *in = input;
char *out = output;
// Load data into the buffer for batch processing
int32_t batchSize = M256_BYTES / FLOAT_BYTES;
int32_t idx = 0;
uint32_t cur = 0;
for (int32_t i = 0; i < nelements; i += 2) {
if (idx == batchSize) {
// Start processing when the buffer is full
__m256i resVec = decodeFloatAvx2(data, flag);
_mm256_storeu_si256((__m256i *)out, resVec);
uint32_t *p = (uint32_t *)out;
for (int32_t j = 0; j < batchSize; ++j) {
p[j] = cur = (p[j] ^ cur);
}
out += M256_BYTES;
idx = 0;
}
uint8_t flag1 = (*in) & 0xF;
uint8_t flag2 = ((*in) >> 4) & 0xF;
int32_t nbytes1 = (flag1 & 0x7) + 1;
int32_t nbytes2 = (flag2 & 0x7) + 1;
in++;
flag[idx * FLOAT_BYTES] = flag1;
flag[(idx + 1) * FLOAT_BYTES] = flag2;
memcpy(data + (idx + 1) * FLOAT_BYTES - nbytes1, in, nbytes1 + nbytes2);
in += nbytes1 + nbytes2;
idx += 2;
}
if (idx) {
idx -= (nelements & 0x1);
// Process the remaining few bytes
__m256i resVec = decodeFloatAvx2(data, flag);
memcpy(out, &resVec, idx * FLOAT_BYTES);
uint32_t *p = (uint32_t *)out;
for (int32_t j = 0; j < idx; ++j) {
p[j] = cur = (p[j] ^ cur);
}
out += idx * FLOAT_BYTES;
}
return (int32_t)(out - output);
}
FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi64x(7);
__m256i lopart = _mm256_set_epi64x(0, -1, 0, -1);
__m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi64(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1));
__m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
}
int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, char *const output) {
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
char *data = (char *)ALIGN_NUM((uint64_t)buf, M256_BYTES);
char *flag = data + M256_BYTES;
const char *in = input;
char *out = output;
// Load data into the buffer for batch processing
int32_t batchSize = M256_BYTES / DOUBLE_BYTES;
int32_t idx = 0;
uint64_t cur = 0;
for (int32_t i = 0; i < nelements; i += 2) {
if (idx == batchSize) {
// Start processing when the buffer is full
__m256i resVec = decodeDoubleAvx2(data, flag);
_mm256_storeu_si256((__m256i *)out, resVec);
uint64_t *p = (uint64_t *)out;
for (int32_t j = 0; j < batchSize; ++j) {
p[j] = cur = (p[j] ^ cur);
}
out += M256_BYTES;
idx = 0;
}
uint8_t flag1 = (*in) & 0xF;
uint8_t flag2 = ((*in) >> 4) & 0xF;
int32_t nbytes1 = (flag1 & 0x7) + 1;
int32_t nbytes2 = (flag2 & 0x7) + 1;
in++;
flag[idx * DOUBLE_BYTES] = flag1;
flag[(idx + 1) * DOUBLE_BYTES] = flag2;
memcpy(data + (idx + 1) * DOUBLE_BYTES - nbytes1, in, nbytes1 + nbytes2);
in += nbytes1 + nbytes2;
idx += 2;
}
if (idx) {
idx -= (nelements & 0x1);
// Process the remaining few bytes
__m256i resVec = decodeDoubleAvx2(data, flag);
memcpy(out, &resVec, idx * DOUBLE_BYTES);
uint64_t *p = (uint64_t *)out;
for (int32_t j = 0; j < idx; ++j) {
p[j] = cur = (p[j] ^ cur);
}
out += idx * DOUBLE_BYTES;
}
return (int32_t)(out - output);
}
// decode two timestamps in one loop.
@ -332,7 +455,6 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements,
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
#if __AVX2__
__m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128();
@ -464,17 +586,16 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements,
ostream[opos++] = prevVal[1] + prevDeltaX;
}
}
#endif
return;
}
#endif
#if __AVX512VL__
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool UNUSED_PARAM(bigEndian)) {
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
#if __AVX512VL__
__m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128();
@ -579,6 +700,6 @@ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelement
}
}
#endif
return;
}
#endif

View File

@ -126,12 +126,12 @@ add_test(
COMMAND regexTest
)
#add_executable(decompressTest "decompressTest.cpp")
#target_link_libraries(decompressTest os util common gtest_main)
#add_test(
# NAME decompressTest
# COMMAND decompressTest
#)
add_executable(decompressTest "decompressTest.cpp")
target_link_libraries(decompressTest os util common gtest_main)
add_test(
NAME decompressTest
COMMAND decompressTest
)
if (${TD_LINUX})
# terrorTest

View File

@ -1,14 +1,12 @@
#define ALLOW_FORBID_FUNC
#include <gtest/gtest.h>
#include <stdlib.h>
#include <tcompression.h>
#include <chrono>
#include <random>
#include "ttypes.h"
namespace {
} // namespace
TEST(utilTest, decompress_ts_test) {
TEST(utilTest, DISABLED_decompress_ts_test) {
{
tsSIMDEnable = 1;
tsAVX2Supported = 1;
@ -29,6 +27,7 @@ TEST(utilTest, decompress_ts_test) {
std::cout << ((int64_t*)decompOutput)[i] << std::endl;
}
#ifdef __AVX512VL__
memset(decompOutput, 0, 10 * 8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 10,
reinterpret_cast<char* const>(decompOutput), false);
@ -36,13 +35,16 @@ TEST(utilTest, decompress_ts_test) {
for (int32_t i = 0; i < 10; ++i) {
std::cout << ((int64_t*)decompOutput)[i] << std::endl;
}
#endif
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
tsList[0] = 1286; tsList[1] = 1124; tsList[2]=2681; tsList[3] = 2823;
tsList[0] = 1286;
tsList[1] = 1124;
tsList[2] = 2681;
tsList[3] = 2823;
// char* pOutput[4 * sizeof(int64_t)] = {0};
len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4,
ONE_STAGE_COMP, NULL, 0);
len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 4, ONE_STAGE_COMP,
NULL, 0);
decompOutput[4 * 8] = {0};
tsDecompressTimestamp(pOutput, len, 4, decompOutput, sizeof(int64_t) * 4, ONE_STAGE_COMP, NULL, 0);
@ -56,6 +58,7 @@ TEST(utilTest, decompress_ts_test) {
int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7,
ONE_STAGE_COMP, NULL, 0);
#ifdef __AVX512VL__
memset(decompOutput, 0, 10 * 8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 7,
reinterpret_cast<char* const>(decompOutput), false);
@ -63,12 +66,14 @@ TEST(utilTest, decompress_ts_test) {
for (int32_t i = 0; i < 7; ++i) {
std::cout << ((int64_t*)decompOutput)[i] << std::endl;
}
#endif
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int64_t tsList2[1] = {1700000000};
int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1,
ONE_STAGE_COMP, NULL, 0);
#ifdef __AVX512VL__
memset(decompOutput, 0, 10 * 8);
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 1,
reinterpret_cast<char* const>(decompOutput), false);
@ -76,52 +81,10 @@ TEST(utilTest, decompress_ts_test) {
for (int32_t i = 0; i < 1; ++i) {
std::cout << ((int64_t*)decompOutput)[i] << std::endl;
}
#endif
}
TEST(utilTest, decompress_bigint_avx2_test) {
{
tsSIMDEnable = 1;
tsAVX2Supported = 1;
}
int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400,
1700000500, 1700000600, 1700000700, 1700000800, 1700000900};
char* pOutput[10 * sizeof(int64_t)] = {0};
int32_t len = tsCompressBigint(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10,
ONE_STAGE_COMP, NULL, 0);
char* decompOutput[10 * 8] = {0};
tsDecompressBigint(pOutput, len, 10, decompOutput, sizeof(int64_t) * 10, ONE_STAGE_COMP, NULL, 0);
for (int32_t i = 0; i < 10; ++i) {
std::cout << ((int64_t*)decompOutput)[i] << std::endl;
}
}
TEST(utilTest, decompress_int_avx2_test) {
{
tsSIMDEnable = 1;
tsAVX2Supported = 1;
}
int32_t tsList[10] = {17000000, 17000001, 17000002, 17000003, 17000004,
17000005, 17000006, 17000007, 17000008, 17000009};
char* pOutput[10 * sizeof(int32_t)] = {0};
int32_t len =
tsCompressInt(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0);
char* decompOutput[10 * 8] = {0};
tsDecompressInt(pOutput, len, 10, decompOutput, sizeof(int32_t) * 10, ONE_STAGE_COMP, NULL, 0);
for (int32_t i = 0; i < 10; ++i) {
std::cout << ((int32_t*)decompOutput)[i] << std::endl;
}
}
TEST(utilTest, decompress_perf_test) {
TEST(utilTest, DISABLED_decompress_perf_test) {
int32_t num = 10000;
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
@ -149,9 +112,11 @@ TEST(utilTest, decompress_perf_test) {
memset(pOutput, 0, num * sizeof(int64_t));
st = taosGetTimestampUs();
#ifdef __AVX512VL__
for (int32_t k = 0; k < 10000; ++k) {
tsDecompressTimestampAvx512(px, num, pOutput, false);
}
#endif
int64_t el2 = taosGetTimestampUs() - st;
std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl;
@ -303,7 +268,7 @@ void* genCompressData_float(int32_t type, int32_t num, bool order) {
}
return pBuf;
}
TEST(utilTest, compressAlg) {
TEST(utilTest, DISABLED_compressAlg) {
int32_t num = 4096;
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
int64_t iniVal = 17000;
@ -480,3 +445,213 @@ TEST(utilTest, compressAlg) {
taosMemoryFree(p);
}
}
static uint32_t decompressRandomSeed;
static void refreshSeed() {
decompressRandomSeed = std::random_device()();
std::cout << "Refresh random seed to " << decompressRandomSeed << "\n";
}
#ifdef __GNUC__
template <typename T>
static std::vector<typename std::enable_if<std::is_integral<T>::value, T>::type> utilTestRandomData(int32_t n, T min,
T max) {
std::mt19937 gen(decompressRandomSeed);
std::vector<T> data(n);
std::uniform_int_distribution<T> dist(min, max);
for (auto& v : data) v = dist(gen);
return data;
}
template <typename T>
static std::vector<typename std::enable_if<std::is_floating_point<T>::value, T>::type> utilTestRandomData(int32_t n,
T min,
T max) {
std::mt19937 gen(decompressRandomSeed);
std::vector<T> data(n);
std::uniform_real_distribution<T> dist(min, max);
for (auto& v : data) v = dist(gen);
return data;
}
#else
template <typename T>
static std::vector<T> utilTestRandomData(int32_t n, T min, T max) {
std::vector<T> data(n);
for (int32_t i = 0; i < n; ++i) {
data[i] = (i & 0x1) ? min : max;
}
return data;
}
#endif
template <typename F>
static double measureRunTime(const F& func, int32_t nround = 1) {
auto start = std::chrono::high_resolution_clock::now();
for (int32_t i = 0; i < nround; ++i) {
func();
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
return duration / 1000.0;
}
template <typename F>
struct DataTypeSupportAvx {
static const bool value = false;
};
template <>
struct DataTypeSupportAvx<float> {
static const bool value = true;
};
template <>
struct DataTypeSupportAvx<double> {
static const bool value = true;
};
template <typename T, typename CompF, typename DecompF>
static void decompressBasicTest(size_t dataSize, const CompF& compress, const DecompF& decompress, T min, T max) {
auto origData = utilTestRandomData(dataSize, min, max);
std::vector<char> compData(origData.size() * sizeof(origData[0]) + 1);
int32_t cnt = compress(origData.data(), origData.size(), origData.size(), compData.data(), compData.size(),
ONE_STAGE_COMP, nullptr, 0);
ASSERT_LE(cnt, compData.size());
decltype(origData) decompData(origData.size());
// test simple implementation without SIMD instructions
tsSIMDEnable = 0;
cnt = decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(),
ONE_STAGE_COMP, nullptr, 0);
ASSERT_EQ(cnt, compData.size() - 1);
EXPECT_EQ(origData, decompData);
#ifdef __AVX2__
if (DataTypeSupportAvx<T>::value) {
// test AVX2 implementation
tsSIMDEnable = 1;
tsAVX2Supported = 1;
cnt = decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(),
ONE_STAGE_COMP, nullptr, 0);
ASSERT_EQ(cnt, compData.size() - 1);
EXPECT_EQ(origData, decompData);
}
#endif
}
template <typename T, typename CompF, typename DecompF>
static void decompressPerfTest(const char* typname, const CompF& compress, const DecompF& decompress, T min, T max) {
constexpr size_t DATA_SIZE = 1 * 1024 * 1024;
constexpr int32_t NROUND = 1000;
auto origData = utilTestRandomData(DATA_SIZE, min, max);
std::vector<char> compData(origData.size() * sizeof(origData[0]) + 1);
int32_t cnt = compress(origData.data(), origData.size(), origData.size(), compData.data(), compData.size(),
ONE_STAGE_COMP, nullptr, 0);
ASSERT_LE(cnt, compData.size());
if (compData[0] == 1) std::cout << "NOT COMPRESSED!\n";
std::cout << "Original size: " << compData.size() - 1 << "; Compressed size: " << cnt
<< "; Compression ratio: " << 1.0 * (compData.size() - 1) / cnt << "\n";
decltype(origData) decompData(origData.size());
tsSIMDEnable = 0;
auto ms = measureRunTime(
[&]() {
decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(),
ONE_STAGE_COMP, nullptr, 0);
},
NROUND);
std::cout << "Decompression of " << NROUND * DATA_SIZE << " " << typname << " without SIMD costs " << ms
<< " ms, avg speed: " << NROUND * DATA_SIZE * 1000 / ms << " tuples/s\n";
#ifdef __AVX2__
if (DataTypeSupportAvx<T>::value) {
tsSIMDEnable = 1;
tsAVX2Supported = 1;
ms = measureRunTime(
[&]() {
decompress(compData.data(), compData.size(), decompData.size(), decompData.data(), decompData.size(),
ONE_STAGE_COMP, nullptr, 0);
},
NROUND);
std::cout << "Decompression of " << NROUND * DATA_SIZE << " " << typname << " using AVX2 costs " << ms
<< " ms, avg speed: " << NROUND * DATA_SIZE * 1000 / ms << " tuples/s\n";
}
#endif
}
#define RUN_PERF_TEST(typname, comp, decomp, min, max) \
do { \
refreshSeed(); \
decompressPerfTest<typname>(#typname, comp, decomp, min, max); \
} while (0)
TEST(utilTest, decompressTinyintBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<int8_t>(r, tsCompressTinyint, tsDecompressTinyint, 0, 100);
}
}
TEST(utilTest, decompressTinyintPerf) { RUN_PERF_TEST(int8_t, tsCompressTinyint, tsDecompressTinyint, 0, 100); }
TEST(utilTest, decompressSmallintBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<int16_t>(r, tsCompressSmallint, tsDecompressSmallint, 0, 10000);
}
}
TEST(utilTest, decompressSmallintPerf) { RUN_PERF_TEST(int16_t, tsCompressSmallint, tsDecompressSmallint, 0, 10000); }
TEST(utilTest, decompressIntBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<int32_t>(r, tsCompressInt, tsDecompressInt, 0, 1000000);
}
}
TEST(utilTest, decompressIntPerf) { RUN_PERF_TEST(int32_t, tsCompressInt, tsDecompressInt, 0, 1000000); }
TEST(utilTest, decompressBigintBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<int64_t>(r, tsCompressBigint, tsDecompressBigint, 0, 1000000000L);
}
}
TEST(utilTest, decompressBigintPerf) { RUN_PERF_TEST(int64_t, tsCompressBigint, tsDecompressBigint, 0, 1000000000L); }
TEST(utilTest, decompressFloatBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<float>(r, tsCompressFloat, tsDecompressFloat, 0, 99999);
}
}
TEST(utilTest, decompressFloatPerf) { RUN_PERF_TEST(float, tsCompressFloat, tsDecompressFloat, 0, 99999); }
TEST(utilTest, decompressDoubleBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<double>(r, tsCompressDouble, tsDecompressDouble, 0, 9999999999);
}
}
TEST(utilTest, decompressDoublePerf) { RUN_PERF_TEST(double, tsCompressDouble, tsDecompressDouble, 0, 9999999999); }
TEST(utilTest, decompressTimestampBasic) {
refreshSeed();
for (int32_t r = 1; r <= 4096; ++r) {
decompressBasicTest<int64_t>(r, tsCompressTimestamp, tsDecompressTimestamp, 0, 1000000000L);
}
}
TEST(utilTest, decompressTimestampPerf) {
refreshSeed();
decompressPerfTest<int64_t>("timestamp", tsCompressTimestamp, tsDecompressTimestamp, 0, 1000000000L);
}