Merge pull request #23673 from taosdata/fix/3_liaohj

fix(stream): fix deadlock and do some internal refactor.
This commit is contained in:
Haojun Liao 2023-11-14 11:26:54 +08:00 committed by GitHub
commit 436c7871a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 351 additions and 261 deletions

View File

@ -170,7 +170,7 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED")
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")

View File

@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);

View File

@ -30,6 +30,10 @@ extern "C" {
#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1)
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
// Compression algorithm
#define NO_COMPRESSION 0
#define ONE_STAGE_COMP 1
@ -129,6 +133,12 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
int32_t nBuf);
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf);
// for internal usage
int32_t getWordLength(char type);
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
/*************************************************************************
* STREAM COMPRESSION

View File

@ -34,7 +34,6 @@ extern "C" {
// Bytes for each type.
extern const int32_t TYPE_BYTES[21];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t)

View File

@ -1974,6 +1974,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);

View File

@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
streamMetaWLock(pWriter->pTq->pStreamMeta);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) {
streamMetaInitBackend(pWriter->pTq->pStreamMeta);
code = streamStateLoadTasks(pWriter);
}
streamMetaWUnLock(pWriter->pTq->pStreamMeta);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter);
return code;

View File

@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
return code;
}
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);

View File

@ -262,18 +262,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
}
// todo: not wait in a critical region
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
// todo refactor: the lock shoud be restricted in one function
void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend == NULL) {
streamMetaWUnLock(pMeta);
while (1) {
streamMetaWLock(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend != NULL) {
break;
}
streamMetaWUnLock(pMeta);
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
void streamMetaClear(SStreamMeta* pMeta) {

View File

@ -52,6 +52,7 @@
#include "lz4.h"
#include "tRealloc.h"
#include "tlog.h"
#include "ttypes.h"
#ifdef TD_TSZ
#include "td_sz.h"
@ -62,8 +63,6 @@ static const int32_t TEST_NUMBER = 1;
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
#ifdef TD_TSZ
bool lossyFloat = false;
@ -99,24 +98,7 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
// get the byte limit.
int32_t word_length = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
word_length = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
word_length = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
word_length = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
word_length = CHAR_BYTES;
break;
default:
uError("Invalid compress integer type:%d", type);
return -1;
}
int32_t word_length = getWordLength(type);
int32_t byte_limit = nelements * word_length + 1;
int32_t opos = 1;
@ -221,24 +203,9 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
}
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t word_length = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
word_length = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
word_length = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
word_length = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
word_length = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return -1;
int32_t word_length = getWordLength(type);
if (word_length == -1) {
return word_length;
}
// If not compressed.
@ -247,8 +214,11 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
return nelements * word_length;
}
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11
// 12 13 14 15
#if __AVX2__
tsDecompressIntImpl_Hw(input, nelements, output, type);
return nelements * word_length;
#else
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
@ -257,185 +227,6 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int32_t _pos = 0;
int64_t prev_value = 0;
#if __AVX2__
while (1) {
if (_pos == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
int32_t elems = selector_to_elems[(int32_t)selector];
// Optimize the performance, by remove the constantly switch operation.
int32_t v = 4;
uint64_t zigzag_value = 0;
uint64_t mask = INT64MASK(bit);
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
int64_t* p = (int64_t*) output;
int32_t gRemainder = (nelements - _pos);
int32_t num = (gRemainder > elems)? elems:gRemainder;
int32_t batch = num >> 2;
int32_t remain = num & 0x03;
if (selector == 0 || selector == 1) {
if (tsAVX2Enable && tsSIMDEnable) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
v += bit;
}
}
} else {
if (tsAVX2Enable && tsSIMDEnable) {
__m256i base = _mm256_set1_epi64x(w);
__m256i maskVal = _mm256_set1_epi64x(mask);
__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
__m256i inc = _mm256_set1_epi64x(bit << 2);
for (int32_t i = 0; i < batch; ++i) {
__m256i after = _mm256_srlv_epi64(base, shiftBits);
__m256i zigzagVal = _mm256_and_si256(after, maskVal);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
// get the four zigzag values here
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
__m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
// 1, 3, 3, 7
//+ 0, 0, 3, 3
// 1, 3, 6, 10
// shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev);
// save it in the memory
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]);
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit;
}
}
}
} break;
case TSDB_DATA_TYPE_INT: {
int32_t* p = (int32_t*) output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int32_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int32_t)prev_value;
v += bit;
}
}
} break;
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* p = (int16_t*) output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int16_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
v += bit;
}
}
} break;
case TSDB_DATA_TYPE_TINYINT: {
int8_t *p = (int8_t *)output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int8_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
v += bit;
}
}
} break;
}
ip += LONG_BYTES;
}
return nelements * word_length;
#else
while (1) {
if (count == nelements) break;
@ -644,6 +435,8 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
// TODO: Take care here, we assumes little endian encoding.
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int32_t _pos = 1;
int32_t longBytes = LONG_BYTES;
ASSERTS(nelements >= 0, "nelements is negative");
if (nelements == 0) return 0;
@ -684,25 +477,25 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
}
flags = flag1 | (flag2 << 4);
// Encode the flag.
if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, &flags, CHAR_BYTES);
_pos += CHAR_BYTES;
/* Here, we assume it is little endian encoding method. */
// Encode dd1
if (is_bigendian()) {
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
} else {
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1), flag1);
}
_pos += flag1;
// Encode dd2;
if (is_bigendian()) {
if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
memcpy(output + _pos, (char *)(&dd2) + LONG_BYTES - flag2, flag2);
if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2);
} else {
if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd2), flag2);
}
_pos += flag2;
@ -715,15 +508,15 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
flag2 = 0;
flags = flag1 | (flag2 << 4);
// Encode the flag.
if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, &flags, CHAR_BYTES);
_pos += CHAR_BYTES;
// Encode dd1;
if (is_bigendian()) {
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
} else {
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
memcpy(output + _pos, (char *)(&dd1), flag1);
}
_pos += flag1;
@ -734,17 +527,19 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
_exit_over:
output[0] = 0; // Means the string is not compressed
memcpy(output + 1, input, nelements * LONG_BYTES);
return nelements * LONG_BYTES + 1;
memcpy(output + 1, input, nelements * longBytes);
return nelements * longBytes + 1;
}
int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int64_t longBytes = LONG_BYTES;
ASSERTS(nelements >= 0, "nelements is negative");
if (nelements == 0) return 0;
if (input[0] == 0) {
memcpy(output, input + 1, nelements * LONG_BYTES);
return nelements * LONG_BYTES;
memcpy(output, input + 1, nelements * longBytes);
return nelements * longBytes;
} else if (input[0] == 1) { // Decompress
int64_t *ostream = (int64_t *)output;
@ -763,7 +558,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes);
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
@ -779,7 +574,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
prev_value = prev_value + prev_delta;
ostream[opos++] = prev_value;
}
if (opos == nelements) return nelements * LONG_BYTES;
if (opos == nelements) return nelements * longBytes;
// Decode dd2
uint64_t dd2 = 0;
@ -788,7 +583,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes);
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
@ -799,7 +594,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
prev_delta = delta_of_delta + prev_delta;
prev_value = prev_value + prev_delta;
ostream[opos++] = prev_value;
if (opos == nelements) return nelements * LONG_BYTES;
if (opos == nelements) return nelements * longBytes;
}
} else {
@ -807,11 +602,13 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
return -1;
}
}
/* --------------------------------------------Double Compression
* ---------------------------------------------- */
/* --------------------------------------------Double Compression ---------------------------------------------- */
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
int32_t longBytes = LONG_BYTES;
uint8_t nbytes = (flag & INT8MASK(3)) + 1;
int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff >>= nshift;
while (nbytes) {
@ -906,12 +703,14 @@ int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, ch
}
FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
int32_t longBytes = LONG_BYTES;
uint64_t diff = 0ul;
int32_t nbytes = (flag & 0x7) + 1;
for (int32_t i = 0; i < nbytes; i++) {
diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i);
}
int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
diff <<= shift_width;
return diff;
@ -1061,14 +860,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t
return diff;
}
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
float *ostream = (float *)output;
if (input[0] == 1) {
memcpy(output, input + 1, nelements * FLOAT_BYTES);
return nelements * FLOAT_BYTES;
}
static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) {
uint8_t flags = 0;
int32_t ipos = 1;
int32_t opos = 0;
@ -1094,6 +886,21 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c
ostream[opos++] = curr.real;
}
}
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
if (input[0] == 1) {
memcpy(output, input + 1, nelements * FLOAT_BYTES);
return nelements * FLOAT_BYTES;
}
if (tsSIMDEnable && tsAVX2Enable) {
tsDecompressFloatImplAvx2(input, nelements, output);
} else if (tsSIMDEnable && tsAVX512Enable) {
tsDecompressFloatImplAvx512(input, nelements, output);
} else { // alternative implementation without SIMD instructions.
tsDecompressFloatHelper(input, nelements, (float*)output);
}
return nelements * FLOAT_BYTES;
}

View File

@ -0,0 +1,251 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "ttypes.h"
#include "tcompression.h"
int32_t getWordLength(char type) {
int32_t wordLength = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
wordLength = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
wordLength = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
wordLength = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
wordLength = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return -1;
}
return wordLength;
}
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
int32_t word_length = getWordLength(type);
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
const char *ip = input + 1;
int32_t count = 0;
int32_t _pos = 0;
int64_t prev_value = 0;
#if __AVX2__
while (1) {
if (_pos == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
int32_t elems = selector_to_elems[(int32_t)selector];
// Optimize the performance, by remove the constantly switch operation.
int32_t v = 4;
uint64_t zigzag_value = 0;
uint64_t mask = INT64MASK(bit);
switch (type) {
case TSDB_DATA_TYPE_BIGINT: {
int64_t* p = (int64_t*) output;
int32_t gRemainder = (nelements - _pos);
int32_t num = (gRemainder > elems)? elems:gRemainder;
int32_t batch = num >> 2;
int32_t remain = num & 0x03;
if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX2Enable) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prev_value;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
#if __AVX512F__
// todo add avx512 impl
#endif
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
v += bit;
}
}
} else {
if (tsSIMDEnable && tsAVX2Enable) {
__m256i base = _mm256_set1_epi64x(w);
__m256i maskVal = _mm256_set1_epi64x(mask);
__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
__m256i inc = _mm256_set1_epi64x(bit << 2);
for (int32_t i = 0; i < batch; ++i) {
__m256i after = _mm256_srlv_epi64(base, shiftBits);
__m256i zigzagVal = _mm256_and_si256(after, maskVal);
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
// get the four zigzag values here
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
__m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
// 1, 3, 3, 7
//+ 0, 0, 3, 3
// 1, 3, 6, 10
// shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev);
// save it in the memory
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
v += bit;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
#if __AVX512F__
// todo add avx512 impl
#endif
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
v += bit;
}
}
}
} break;
case TSDB_DATA_TYPE_INT: {
int32_t* p = (int32_t*) output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int32_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int32_t)prev_value;
v += bit;
}
}
} break;
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* p = (int16_t*) output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int16_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
v += bit;
}
}
} break;
case TSDB_DATA_TYPE_TINYINT: {
int8_t *p = (int8_t *)output;
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int8_t)prev_value;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
v += bit;
}
}
} break;
}
ip += LONG_BYTES;
}
#endif
return nelements * word_length;
}
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) {
#if __AVX512F__
// todo add it
#endif
return 0;
}
// todo add later
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) {
#if __AVX2__
#endif
return 0;
}