From 9914e327d126a1edbacf2d6c9ca9bf3079b6fad8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Nov 2023 00:43:13 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/util/tcompression.h | 11 ++ include/util/tdef.h | 1 - source/util/src/tcompression.c | 307 ++++++--------------------------- source/util/src/tdecompress.c | 249 ++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 251 deletions(-) create mode 100644 source/util/src/tdecompress.c diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 3a3d13117e..e5a4ed36ff 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -30,6 +30,10 @@ extern "C" { #define INT64MASK(_x) ((((uint64_t)1) << _x) - 1) #define INT32MASK(_x) (((uint32_t)1 << _x) - 1) #define INT8MASK(_x) (((uint8_t)1 << _x) - 1) + +#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode +#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode + // Compression algorithm #define NO_COMPRESSION 0 #define ONE_STAGE_COMP 1 @@ -129,6 +133,13 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 int32_t nBuf); int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf); +// for internal usage +int32_t getWordLength(char type); + +int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); +void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream); /************************************************************************* * STREAM COMPRESSION diff --git a/include/util/tdef.h b/include/util/tdef.h index 1b56b5b623..69d0c1126d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -34,7 +34,6 @@ extern "C" { // Bytes for each type. extern const int32_t TYPE_BYTES[21]; -// TODO: replace and remove code below #define CHAR_BYTES sizeof(char) #define SHORT_BYTES sizeof(int16_t) #define INT_BYTES sizeof(int32_t) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index dc89a24180..6d3ff8c070 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -52,6 +52,7 @@ #include "lz4.h" #include "tRealloc.h" #include "tlog.h" +#include "ttypes.h" #ifdef TD_TSZ #include "td_sz.h" @@ -62,8 +63,6 @@ static const int32_t TEST_NUMBER = 1; #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) #define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) -#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode -#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode #ifdef TD_TSZ bool lossyFloat = false; @@ -99,24 +98,7 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; // get the byte limit. - int32_t word_length = 0; - switch (type) { - case TSDB_DATA_TYPE_BIGINT: - word_length = LONG_BYTES; - break; - case TSDB_DATA_TYPE_INT: - word_length = INT_BYTES; - break; - case TSDB_DATA_TYPE_SMALLINT: - word_length = SHORT_BYTES; - break; - case TSDB_DATA_TYPE_TINYINT: - word_length = CHAR_BYTES; - break; - default: - uError("Invalid compress integer type:%d", type); - return -1; - } + int32_t word_length = getWordLength(type); int32_t byte_limit = nelements * word_length + 1; int32_t opos = 1; @@ -221,24 +203,9 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char } int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) { - - int32_t word_length = 0; - switch (type) { - case TSDB_DATA_TYPE_BIGINT: - word_length = LONG_BYTES; - break; - case TSDB_DATA_TYPE_INT: - word_length = INT_BYTES; - break; - case TSDB_DATA_TYPE_SMALLINT: - word_length = SHORT_BYTES; - break; - case TSDB_DATA_TYPE_TINYINT: - word_length = CHAR_BYTES; - break; - default: - uError("Invalid decompress integer type:%d", type); - return -1; + int32_t word_length = getWordLength(type); + if (word_length == -1) { + return word_length; } // If not compressed. @@ -247,8 +214,11 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha return nelements * word_length; } - // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 - // 12 13 14 15 +#if __AVX2__ + tsDecompressIntImpl_Hw(input, nelements, output, type); + return nelements * word_length; +#else + // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; @@ -257,185 +227,6 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha int32_t _pos = 0; int64_t prev_value = 0; -#if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); - - char selector = (char)(w & INT64MASK(4)); // selector = 4 - char bit = bit_per_integer[(int32_t)selector]; // bit = 3 - int32_t elems = selector_to_elems[(int32_t)selector]; - - // Optimize the performance, by remove the constantly switch operation. - int32_t v = 4; - uint64_t zigzag_value = 0; - uint64_t mask = INT64MASK(bit); - - switch (type) { - case TSDB_DATA_TYPE_BIGINT: { - int64_t* p = (int64_t*) output; - - int32_t gRemainder = (nelements - _pos); - int32_t num = (gRemainder > elems)? elems:gRemainder; - - int32_t batch = num >> 2; - int32_t remain = num & 0x03; - if (selector == 0 || selector == 1) { - if (tsAVX2Enable && tsSIMDEnable) { - for (int32_t i = 0; i < batch; ++i) { - __m256i prev = _mm256_set1_epi64x(prev_value); - _mm256_storeu_si256((__m256i *)&p[_pos], prev); - _pos += 4; - } - - for (int32_t i = 0; i < remain; ++i) { - p[_pos++] = prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = prev_value; - v += bit; - } - } - } else { - if (tsAVX2Enable && tsSIMDEnable) { - __m256i base = _mm256_set1_epi64x(w); - __m256i maskVal = _mm256_set1_epi64x(mask); - - __m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4); - __m256i inc = _mm256_set1_epi64x(bit << 2); - - for (int32_t i = 0; i < batch; ++i) { - __m256i after = _mm256_srlv_epi64(base, shiftBits); - __m256i zigzagVal = _mm256_and_si256(after, maskVal); - - // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) - __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); - signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); - // get the four zigzag values here - __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); - - // calculate the cumulative sum (prefix sum) for each number - // decode[0] = prev_value + final[0] - // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] - // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2] - // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3] - - // 1, 2, 3, 4 - //+ 0, 1, 0, 3 - // 1, 3, 3, 7 - // shift and add for the first round - __m128i prev = _mm_set1_epi64x(prev_value); - __m256i x = _mm256_slli_si256(delta, 8); - - delta = _mm256_add_epi64(delta, x); - _mm256_storeu_si256((__m256i *)&p[_pos], delta); - - // 1, 3, 3, 7 - //+ 0, 0, 3, 3 - // 1, 3, 6, 10 - // shift and add operation for the second round - __m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]); - __m128i secondItem = _mm_set1_epi64x(p[_pos + 1]); - __m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem); - firstPart = _mm_add_epi64(firstPart, prev); - secPart = _mm_add_epi64(secPart, prev); - - // save it in the memory - _mm_storeu_si128((__m128i *)&p[_pos], firstPart); - _mm_storeu_si128((__m128i *)&p[_pos + 2], secPart); - - shiftBits = _mm256_add_epi64(shiftBits, inc); - prev_value = p[_pos + 3]; -// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]); - _pos += 4; - } - - // handle the remain value - for (int32_t i = 0; i < remain; i++) { - zigzag_value = ((w >> (v + (batch * bit * 4))) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = prev_value; -// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]); - - v += bit; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = prev_value; -// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]); - - v += bit; - } - } - } - } break; - case TSDB_DATA_TYPE_INT: { - int32_t* p = (int32_t*) output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int32_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int32_t)prev_value; - v += bit; - } - } - } break; - case TSDB_DATA_TYPE_SMALLINT: { - int16_t* p = (int16_t*) output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int16_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int16_t)prev_value; - v += bit; - } - } - } break; - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *p = (int8_t *)output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int8_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int8_t)prev_value; - v += bit; - } - } - } break; - } - - ip += LONG_BYTES; - } - - return nelements * word_length; -#else - while (1) { if (count == nelements) break; @@ -644,6 +435,8 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c // TODO: Take care here, we assumes little endian encoding. int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { int32_t _pos = 1; + int32_t longBytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + ASSERTS(nelements >= 0, "nelements is negative"); if (nelements == 0) return 0; @@ -684,25 +477,25 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, } flags = flag1 | (flag2 << 4); // Encode the flag. - if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, &flags, CHAR_BYTES); _pos += CHAR_BYTES; /* Here, we assume it is little endian encoding method. */ // Encode dd1 if (is_bigendian()) { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1); + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1); } else { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd1), flag1); } _pos += flag1; // Encode dd2; if (is_bigendian()) { - if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd2) + LONG_BYTES - flag2, flag2); + if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2); } else { - if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd2), flag2); } _pos += flag2; @@ -715,15 +508,15 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, flag2 = 0; flags = flag1 | (flag2 << 4); // Encode the flag. - if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, &flags, CHAR_BYTES); _pos += CHAR_BYTES; // Encode dd1; if (is_bigendian()) { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1); + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1); } else { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd1), flag1); } _pos += flag1; @@ -734,17 +527,19 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, _exit_over: output[0] = 0; // Means the string is not compressed - memcpy(output + 1, input, nelements * LONG_BYTES); - return nelements * LONG_BYTES + 1; + memcpy(output + 1, input, nelements * longBytes); + return nelements * longBytes + 1; } int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { + int64_t longBytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + ASSERTS(nelements >= 0, "nelements is negative"); if (nelements == 0) return 0; if (input[0] == 0) { - memcpy(output, input + 1, nelements * LONG_BYTES); - return nelements * LONG_BYTES; + memcpy(output, input + 1, nelements * longBytes); + return nelements * longBytes; } else if (input[0] == 1) { // Decompress int64_t *ostream = (int64_t *)output; @@ -763,7 +558,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement delta_of_delta = 0; } else { if (is_bigendian()) { - memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes); + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); } else { memcpy(&dd1, input + ipos, nbytes); } @@ -779,7 +574,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; } - if (opos == nelements) return nelements * LONG_BYTES; + if (opos == nelements) return nelements * longBytes; // Decode dd2 uint64_t dd2 = 0; @@ -788,7 +583,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement delta_of_delta = 0; } else { if (is_bigendian()) { - memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes); + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); } else { memcpy(&dd2, input + ipos, nbytes); } @@ -799,7 +594,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement prev_delta = delta_of_delta + prev_delta; prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; - if (opos == nelements) return nelements * LONG_BYTES; + if (opos == nelements) return nelements * longBytes; } } else { @@ -807,11 +602,13 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement return -1; } } -/* --------------------------------------------Double Compression - * ---------------------------------------------- */ + +/* --------------------------------------------Double Compression ---------------------------------------------- */ void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) { + int32_t longBytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + uint8_t nbytes = (flag & INT8MASK(3)) + 1; - int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); + int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); diff >>= nshift; while (nbytes) { @@ -906,12 +703,14 @@ int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, ch } FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) { + int32_t longBytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + uint64_t diff = 0ul; int32_t nbytes = (flag & 0x7) + 1; for (int32_t i = 0; i < nbytes; i++) { diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i); } - int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); + int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); diff <<= shift_width; return diff; @@ -1061,14 +860,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t return diff; } -int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) { - float *ostream = (float *)output; - - if (input[0] == 1) { - memcpy(output, input + 1, nelements * FLOAT_BYTES); - return nelements * FLOAT_BYTES; - } - +void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) { uint8_t flags = 0; int32_t ipos = 1; int32_t opos = 0; @@ -1094,6 +886,21 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c ostream[opos++] = curr.real; } +} + +int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) { + if (input[0] == 1) { + memcpy(output, input + 1, nelements * FLOAT_BYTES); + return nelements * FLOAT_BYTES; + } + + if (tsSIMDEnable && tsAVX2Enable) { + tsDecompressFloatImplAvx2(input, nelements, output); + } else if (tsSIMDEnable && tsAVX512Enable) { + tsDecompressFloatImplAvx512(input, nelements, output); + } else { // alternative implementation without SIMD instructions. + tsDecompressFloatHelper(input, nelements, (float*)output); + } return nelements * FLOAT_BYTES; } diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c new file mode 100644 index 0000000000..2763e6fd76 --- /dev/null +++ b/source/util/src/tdecompress.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "ttypes.h" +#include "tcompression.h" + +int32_t getWordLength(char type) { + int32_t wordLength = 0; + switch (type) { + case TSDB_DATA_TYPE_BIGINT: + wordLength = LONG_BYTES; + break; + case TSDB_DATA_TYPE_INT: + wordLength = INT_BYTES; + break; + case TSDB_DATA_TYPE_SMALLINT: + wordLength = SHORT_BYTES; + break; + case TSDB_DATA_TYPE_TINYINT: + wordLength = CHAR_BYTES; + break; + default: + uError("Invalid decompress integer type:%d", type); + return -1; + } + + return wordLength; +} + +int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) { + int32_t word_length = getWordLength(type); + + // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; + int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; + + const char *ip = input + 1; + int32_t count = 0; + int32_t _pos = 0; + int64_t prev_value = 0; + + while (1) { + if (_pos == nelements) break; + + uint64_t w = 0; + memcpy(&w, ip, LONG_BYTES); + + char selector = (char)(w & INT64MASK(4)); // selector = 4 + char bit = bit_per_integer[(int32_t)selector]; // bit = 3 + int32_t elems = selector_to_elems[(int32_t)selector]; + + // Optimize the performance, by remove the constantly switch operation. + int32_t v = 4; + uint64_t zigzag_value = 0; + uint64_t mask = INT64MASK(bit); + + switch (type) { + case TSDB_DATA_TYPE_BIGINT: { + int64_t* p = (int64_t*) output; + + int32_t gRemainder = (nelements - _pos); + int32_t num = (gRemainder > elems)? elems:gRemainder; + + int32_t batch = num >> 2; + int32_t remain = num & 0x03; + if (selector == 0 || selector == 1) { + if (tsSIMDEnable && tsAVX2Enable) { + for (int32_t i = 0; i < batch; ++i) { + __m256i prev = _mm256_set1_epi64x(prev_value); + _mm256_storeu_si256((__m256i *)&p[_pos], prev); + _pos += 4; + } + + for (int32_t i = 0; i < remain; ++i) { + p[_pos++] = prev_value; + } + } else if (tsSIMDEnable && tsAVX512Enable) { +#if __AVX512F__ + // todo add avx512 impl +#endif + } else { // alternative implementation without SIMD instructions. + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = prev_value; + v += bit; + } + } + } else { + if (tsSIMDEnable && tsAVX2Enable) { + __m256i base = _mm256_set1_epi64x(w); + __m256i maskVal = _mm256_set1_epi64x(mask); + + __m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4); + __m256i inc = _mm256_set1_epi64x(bit << 2); + + for (int32_t i = 0; i < batch; ++i) { + __m256i after = _mm256_srlv_epi64(base, shiftBits); + __m256i zigzagVal = _mm256_and_si256(after, maskVal); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); + signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); + + // get the four zigzag values here + __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); + + // calculate the cumulative sum (prefix sum) for each number + // decode[0] = prev_value + final[0] + // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] + // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2] + // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3] + + // 1, 2, 3, 4 + //+ 0, 1, 0, 3 + // 1, 3, 3, 7 + // shift and add for the first round + __m128i prev = _mm_set1_epi64x(prev_value); + __m256i x = _mm256_slli_si256(delta, 8); + + delta = _mm256_add_epi64(delta, x); + _mm256_storeu_si256((__m256i *)&p[_pos], delta); + + // 1, 3, 3, 7 + //+ 0, 0, 3, 3 + // 1, 3, 6, 10 + // shift and add operation for the second round + __m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]); + __m128i secondItem = _mm_set1_epi64x(p[_pos + 1]); + __m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem); + firstPart = _mm_add_epi64(firstPart, prev); + secPart = _mm_add_epi64(secPart, prev); + + // save it in the memory + _mm_storeu_si128((__m128i *)&p[_pos], firstPart); + _mm_storeu_si128((__m128i *)&p[_pos + 2], secPart); + + shiftBits = _mm256_add_epi64(shiftBits, inc); + prev_value = p[_pos + 3]; + _pos += 4; + } + + // handle the remain value + for (int32_t i = 0; i < remain; i++) { + zigzag_value = ((w >> (v + (batch * bit * 4))) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = prev_value; + v += bit; + } + } else if (tsSIMDEnable && tsAVX512Enable) { +#if __AVX512F__ + // todo add avx512 impl +#endif + } else { // alternative implementation without SIMD instructions. + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = prev_value; + v += bit; + } + } + } + } break; + case TSDB_DATA_TYPE_INT: { + int32_t* p = (int32_t*) output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int32_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int32_t)prev_value; + v += bit; + } + } + } break; + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* p = (int16_t*) output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int16_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int16_t)prev_value; + v += bit; + } + } + } break; + + case TSDB_DATA_TYPE_TINYINT: { + int8_t *p = (int8_t *)output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int8_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int8_t)prev_value; + v += bit; + } + } + } break; + } + + ip += LONG_BYTES; + } + + return nelements * word_length; +} + +int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) { +#if __AVX512F__ + // todo add it +#endif + return 0; +} + +// todo add later +int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { +#if __AVX2__ +#endif + return 0; +} \ No newline at end of file