more code

This commit is contained in:
Hongze Cheng 2024-03-03 16:01:12 +08:00
parent d7a44a22f4
commit 9b8dec7691
12 changed files with 450 additions and 762 deletions

View File

@ -112,7 +112,7 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value);
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo,
SValueColumn *valCol, SBuffer *buffer);
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer);
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBuffer *buffer);
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo);
int32_t tValueCompare(const SValue *tv1, const SValue *tv2);

View File

@ -32,45 +32,38 @@ static int32_t tBufferInit(SBuffer *buffer);
static int32_t tBufferDestroy(SBuffer *buffer);
static int32_t tBufferClear(SBuffer *buffer);
static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity);
static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size);
static int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data);
static int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size);
static int32_t tBufferPutI8(SBuffer *buffer, int8_t value);
static int32_t tBufferPutI16(SBuffer *buffer, int16_t value);
static int32_t tBufferPutI32(SBuffer *buffer, int32_t value);
static int32_t tBufferPutI64(SBuffer *buffer, int64_t value);
static int32_t tBufferPutU8(SBuffer *buffer, uint8_t value);
static int32_t tBufferPutU16(SBuffer *buffer, uint16_t value);
static int32_t tBufferPutU32(SBuffer *buffer, uint32_t value);
static int32_t tBufferPutU64(SBuffer *buffer, uint64_t value);
static int32_t tBufferPutI16v(SBuffer *buffer, int16_t value);
static int32_t tBufferPutI32v(SBuffer *buffer, int32_t value);
static int32_t tBufferPutI64v(SBuffer *buffer, int64_t value);
static int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value);
static int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value);
static int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value);
static int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size);
static int32_t tBufferPutCStr(SBuffer *buffer, const char *str);
static int32_t tBufferPutF32(SBuffer *buffer, float value);
static int32_t tBufferPutF64(SBuffer *buffer, double value);
#define tBufferGetSize(buffer) ((buffer)->size)
#define tBufferGetCapacity(buffer) ((buffer)->capacity)
#define tBufferGetData(buffer) ((buffer)->data)
#define tBufferGetDataAt(buffer, idx) ((char *)(buffer)->data + (idx))
#define tBufferGetDataEnd(buffer) ((char *)(buffer)->data + (buffer)->size)
// SBufferWriter
#define BUFFER_WRITER_INITIALIZER(forward, offset, buffer) ((SBufferWriter){forward, offset, buffer})
#define tBufferWriterDestroy(writer) ((void)0)
#define tBufferWriterGetOffset(writer) ((writer)->offset)
static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer);
static int32_t tBufferPutFixed(SBufferWriter *writer, const void *data, uint32_t size);
static int32_t tBufferPutI8(SBufferWriter *writer, int8_t value);
static int32_t tBufferPutI16(SBufferWriter *writer, int16_t value);
static int32_t tBufferPutI32(SBufferWriter *writer, int32_t value);
static int32_t tBufferPutI64(SBufferWriter *writer, int64_t value);
static int32_t tBufferPutU8(SBufferWriter *writer, uint8_t value);
static int32_t tBufferPutU16(SBufferWriter *writer, uint16_t value);
static int32_t tBufferPutU32(SBufferWriter *writer, uint32_t value);
static int32_t tBufferPutU64(SBufferWriter *writer, uint64_t value);
static int32_t tBufferPutI16v(SBufferWriter *writer, int16_t value);
static int32_t tBufferPutI32v(SBufferWriter *writer, int32_t value);
static int32_t tBufferPutI64v(SBufferWriter *writer, int64_t value);
static int32_t tBufferPutU16v(SBufferWriter *writer, uint16_t value);
static int32_t tBufferPutU32v(SBufferWriter *writer, uint32_t value);
static int32_t tBufferPutU64v(SBufferWriter *writer, uint64_t value);
static int32_t tBufferPutBinary(SBufferWriter *writer, const void *data, uint32_t size);
static int32_t tBufferPutCStr(SBufferWriter *writer, const char *str);
static int32_t tBufferPutF32(SBufferWriter *writer, float value);
static int32_t tBufferPutF64(SBufferWriter *writer, double value);
// SBufferReader
#define BUFFER_READER_INITIALIZER(forward, offset, buffer) ((SBufferReader){forward, offset, buffer})
#define tBufferReaderDestroy(reader) ((void)0)
#define tBufferReaderGetOffset(reader) ((reader)->offset)
static int32_t tBufferReaderInit(SBufferReader *reader, bool forward, uint32_t offset, SBuffer *buffer);
static int32_t tBufferGetFixed(SBufferReader *reader, void *data, uint32_t size);
#define BUFFER_READER_INITIALIZER(offset, buffer) ((SBufferReader){offset, buffer})
#define tBufferReaderDestroy(reader) ((void)0)
#define tBufferReaderGetOffset(reader) ((reader)->offset)
static int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data);
static int32_t tBufferReaderInit(SBufferReader *reader, uint32_t offset, SBuffer *buffer);
static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value);
static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value);
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value);

View File

@ -29,7 +29,6 @@ struct SBufferWriter {
};
struct SBufferReader {
bool forward;
uint32_t offset;
SBuffer *buffer;
};
@ -73,7 +72,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa
return 0;
}
static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) {
static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size) {
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size);
if (code) return code;
memcpy((char *)buffer->data + buffer->size, data, size);
@ -81,184 +80,132 @@ static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uin
return 0;
}
static FORCE_INLINE int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data) {
if (index < 0 || (index + 1) * size > buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
memcpy(data, (char *)buffer->data + index * size, size);
return 0;
static FORCE_INLINE int32_t tBufferPutI8(SBuffer *buffer, int8_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
// SBufferWriter
static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer) {
writer->forward = forward;
writer->offset = offset;
writer->buffer = buffer;
return 0;
static FORCE_INLINE int32_t tBufferPutI16(SBuffer *buffer, int16_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutFixed(SBufferWriter *writer, const void *data, uint32_t size) {
if (!writer->forward && writer->offset < size) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t code = tBufferEnsureCapacity(writer->buffer, writer->forward ? writer->offset + size : writer->offset);
if (code) return code;
if (writer->forward) {
memcpy((char *)writer->buffer->data + writer->offset, data, size);
writer->offset += size;
} else {
writer->offset -= size;
memcpy((char *)writer->buffer->data + writer->offset, data, size);
}
return 0;
static FORCE_INLINE int32_t tBufferPutI32(SBuffer *buffer, int32_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutI8(SBufferWriter *writer, int8_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
static FORCE_INLINE int32_t tBufferPutI64(SBuffer *buffer, int64_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutI16(SBufferWriter *writer, int16_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
static FORCE_INLINE int32_t tBufferPutU8(SBuffer *buffer, uint8_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutI32(SBufferWriter *writer, int32_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
static FORCE_INLINE int32_t tBufferPutU16(SBuffer *buffer, uint16_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutI64(SBufferWriter *writer, int64_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
static FORCE_INLINE int32_t tBufferPutU32(SBuffer *buffer, uint32_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutU8(SBufferWriter *writer, uint8_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
static FORCE_INLINE int32_t tBufferPutU64(SBuffer *buffer, uint64_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutU16(SBufferWriter *writer, uint16_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value) { return tBufferPutU64v(buffer, value); }
static FORCE_INLINE int32_t tBufferPutU32(SBufferWriter *writer, uint32_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value) { return tBufferPutU64v(buffer, value); }
static FORCE_INLINE int32_t tBufferPutU64(SBufferWriter *writer, uint64_t value) {
return tBufferPutFixed(writer, &value, sizeof(value));
}
static FORCE_INLINE int32_t tBufferPutU64v(SBufferWriter *writer, uint64_t value) {
static FORCE_INLINE int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value) {
int32_t code;
while (value >= 0x80) {
code = tBufferPutU8(writer, (value & 0x7F) | 0x80);
code = tBufferPutU8(buffer, (value & 0x7F) | 0x80);
if (code) return code;
value >>= 7;
}
return tBufferPutU8(writer, value);
return tBufferPutU8(buffer, value);
}
static FORCE_INLINE int32_t tBufferPutU16v(SBufferWriter *writer, uint16_t value) {
return tBufferPutU64v(writer, value);
static FORCE_INLINE int32_t tBufferPutI16v(SBuffer *buffer, int16_t value) {
return tBufferPutU64v(buffer, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE int32_t tBufferPutU32v(SBufferWriter *writer, uint32_t value) {
return tBufferPutU64v(writer, value);
static FORCE_INLINE int32_t tBufferPutI32v(SBuffer *buffer, int32_t value) {
return tBufferPutU64v(buffer, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE int32_t tBufferPutI16v(SBufferWriter *writer, int16_t value) {
return tBufferPutU16v(writer, ZIGZAGE(int16_t, value));
static FORCE_INLINE int32_t tBufferPutI64v(SBuffer *buffer, int64_t value) {
return tBufferPutU64v(buffer, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE int32_t tBufferPutI32v(SBufferWriter *writer, int32_t value) {
return tBufferPutU32v(writer, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE int32_t tBufferPutI64v(SBufferWriter *writer, int64_t value) {
return tBufferPutU64v(writer, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE int32_t tBufferPutBinary(SBufferWriter *writer, const void *data, uint32_t size) {
int32_t code = tBufferPutU32(writer, size);
static FORCE_INLINE int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size) {
int32_t code = tBufferPutU32v(buffer, size);
if (code) return code;
return tBufferPutFixed(writer, data, size);
return tBufferPut(buffer, data, size);
}
static FORCE_INLINE int32_t tBufferPutCStr(SBufferWriter *writer, const char *str) {
return tBufferPutBinary(writer, str, strlen(str) + 1);
static FORCE_INLINE int32_t tBufferPutCStr(SBuffer *buffer, const char *str) {
return tBufferPutBinary(buffer, str, str ? 0 : strlen(str) + 1);
}
static FORCE_INLINE int32_t tBufferPutF32(SBufferWriter *writer, float value) {
static FORCE_INLINE int32_t tBufferPutF32(SBuffer *buffer, float value) {
union {
float f;
uint32_t u;
} u = {.f = value};
return tBufferPutU32(writer, u.u);
return tBufferPutU32(buffer, u.u);
}
static FORCE_INLINE int32_t tBufferPutF64(SBufferWriter *writer, double value) {
static FORCE_INLINE int32_t tBufferPutF64(SBuffer *buffer, double value) {
union {
double f;
uint64_t u;
} u = {.f = value};
return tBufferPutU64(writer, u.u);
return tBufferPutU64(buffer, u.u);
}
// reader
// SBufferReader
static int32_t tBufferReaderInit(SBufferReader *reader, bool forward, uint32_t offset, SBuffer *buffer) {
reader->forward = forward;
reader->offset = offset;
reader->buffer = buffer;
static int32_t tBufferReaderInit(SBufferReader *reader, uint32_t offset, SBuffer *buffer) {
(*reader) = BUFFER_READER_INITIALIZER(offset, buffer);
return 0;
}
static int32_t tBufferGetFixed(SBufferReader *reader, void *data, uint32_t size) {
if ((reader->forward && reader->offset + size > reader->buffer->capacity) ||
(!reader->forward && reader->offset < size)) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
if (data) {
if (reader->forward) {
memcpy(data, (char *)reader->buffer->data + reader->offset, size);
reader->offset += size;
} else {
reader->offset -= size;
memcpy(data, (char *)reader->buffer->data + reader->offset, size);
}
static FORCE_INLINE int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data) {
if (reader->offset < 0 || reader->offset + size > reader->buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
memcpy(data, (char *)reader->buffer->data + reader->offset, size);
reader->offset += size;
return 0;
}
static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
}
static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value) { return tBufferGet(reader, sizeof(*value), value); }
static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
}
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); }
static int32_t tBufferGetU16(SBufferReader *reader, uint16_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetU32(SBufferReader *reader, uint32_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetU64(SBufferReader *reader, uint64_t *value) {
return tBufferGetFixed(reader, value, sizeof(*value));
return tBufferGet(reader, sizeof(*value), value);
}
static int32_t tBufferGetU64v(SBufferReader *reader, uint64_t *value) {
@ -274,7 +221,7 @@ static int32_t tBufferGetU64v(SBufferReader *reader, uint64_t *value) {
if (code) return code;
if (value) {
*value |= ((uint64_t)(byte & 0x7F)) << (i * 7);
*value |= (((uint64_t)(byte & 0x7F)) << (i * 7));
}
if (byte < 0x80) {
@ -340,29 +287,25 @@ static int32_t tBufferGetBinary(SBufferReader *reader, const void **data, uint32
int32_t code;
// size
code = tBufferGetU32(reader, &tmpSize);
code = tBufferGetU32v(reader, &tmpSize);
if (code) return code;
if (size) {
*size = tmpSize;
}
// data
if (reader->forward) {
if (reader->offset + tmpSize > reader->buffer->capacity) {
return TSDB_CODE_OPS_NOT_SUPPORT;
if (tmpSize > 0) {
if (reader->offset + tmpSize > reader->buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
if (data) {
*data = (char *)reader->buffer->data + reader->offset;
}
reader->offset += tmpSize;
} else {
if (reader->offset < tmpSize) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
reader->offset -= tmpSize;
if (data) {
*data = (char *)reader->buffer->data + reader->offset;
}
*data = NULL;
}
return 0;
}

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "tdataformat.h"
#include "tRealloc.h"
#include "tcoding.h"
#include "tdatablock.h"
#include "tlog.h"
@ -3993,15 +3992,14 @@ int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value) {
ASSERT(value->type == valCol->type);
if (IS_VAR_DATA_TYPE(value->type)) {
int32_t offset = tBufferGetSize(&valCol->data);
if ((code = tBufferAppend(&valCol->offsets, &offset, sizeof(offset)))) {
if ((code = tBufferPutI32(&valCol->offsets, tBufferGetSize(&valCol->data)))) {
return code;
}
if ((code = tBufferAppend(&valCol->data, value->pData, value->nData))) {
if ((code = tBufferPut(&valCol->data, value->pData, value->nData))) {
return code;
}
} else {
return tBufferAppend(&valCol->data, &value->val, tDataTypes[value->type].bytes);
return tBufferPut(&valCol->data, &value->val, tDataTypes[value->type].bytes);
}
valCol->numOfValues++;
@ -4015,18 +4013,20 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) {
value->type = valCol->type;
if (IS_VAR_DATA_TYPE(value->type)) {
int32_t offset, nextOffset;
int32_t offset, nextOffset;
SBufferReader reader = BUFFER_READER_INITIALIZER(idx * sizeof(offset), &valCol->offsets);
tBufferGet(&valCol->offsets, idx, sizeof(offset), &offset);
tBufferGetI32(&reader, &offset);
if (idx == valCol->numOfValues - 1) {
nextOffset = tBufferGetSize(&valCol->data);
} else {
tBufferGet(&valCol->offsets, idx + 1, sizeof(nextOffset), &nextOffset);
tBufferGetI32(&reader, &nextOffset);
}
value->nData = nextOffset - offset;
value->pData = (uint8_t *)tBufferGetDataAt(&valCol->data, offset);
} else {
tBufferGet(&valCol->data, idx, tDataTypes[value->type].bytes, &value->val);
SBufferReader reader = BUFFER_READER_INITIALIZER(idx * tDataTypes[value->type].bytes, &valCol->data);
tBufferGet(&reader, tDataTypes[value->type].bytes, &value->val);
}
return 0;
}
@ -4140,54 +4140,42 @@ int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColum
return 0;
}
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer) {
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBuffer *buffer) {
int32_t code;
uint8_t formatVersion = 0;
uint8_t fmtVer = 0;
// format version
code = tBufferPutU8(writer, formatVersion);
if (code) return code;
// struct info
code = tBufferPutI8(writer, compressInfo->cmprAlg);
if (code) return code;
code = tBufferPutI8(writer, compressInfo->type);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->dataOriginalSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->dataCompressedSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->offsetOriginalSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->offsetCompressedSize);
if (code) return code;
if ((code = tBufferPutU8(buffer, fmtVer))) return code;
if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code;
if ((code = tBufferPutI8(buffer, info->type))) return code;
if (IS_VAR_DATA_TYPE(info->type)) {
if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code;
if ((code = tBufferPutI32v(buffer, info->offsetCompressedSize))) return code;
}
if ((code = tBufferPutI32v(buffer, info->dataOriginalSize))) return code;
if ((code = tBufferPutI32v(buffer, info->dataCompressedSize))) return code;
return 0;
}
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo) {
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *info) {
int32_t code;
uint8_t formatVersion;
uint8_t fmtVer;
// format version
code = tBufferGetU8(reader, &formatVersion);
if (code) return code;
if (formatVersion == 0) {
code = tBufferGetI8(reader, &compressInfo->cmprAlg);
if (code) return code;
code = tBufferGetI8(reader, &compressInfo->type);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->dataOriginalSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->dataCompressedSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->offsetOriginalSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->offsetCompressedSize);
if (code) return code;
if ((code = tBufferGetU8(reader, &fmtVer))) return code;
if (fmtVer == 0) {
if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code;
if ((code = tBufferGetI8(reader, &info->type))) return code;
if (IS_VAR_DATA_TYPE(info->type)) {
if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code;
if ((code = tBufferGetI32v(reader, &info->offsetCompressedSize))) return code;
} else {
info->offsetOriginalSize = 0;
info->offsetCompressedSize = 0;
}
if ((code = tBufferGetI32v(reader, &info->dataOriginalSize))) return code;
if ((code = tBufferGetI32v(reader, &info->dataCompressedSize))) return code;
} else {
return TSDB_CODE_INVALID_DATA_FMT;
ASSERT(0);
}
return 0;

View File

@ -19,7 +19,8 @@
struct SDataFileReader {
SDataFileReaderConfig config[1];
uint8_t *bufArr[5];
SBuffer local[5];
SBuffer *buffers;
struct {
bool headFooterLoaded;
@ -89,9 +90,14 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) {
tBufferInit(reader[0]->local + i);
}
reader[0]->config[0] = config[0];
if (reader[0]->config->bufArr == NULL) {
reader[0]->config->bufArr = reader[0]->bufArr;
reader[0]->buffers = config->buffers;
if (reader[0]->buffers == NULL) {
reader[0]->buffers = reader[0]->local;
}
if (fname) {
@ -125,19 +131,14 @@ int32_t tsdbDataFileReaderClose(SDataFileReader **reader) {
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL);
#if 0
TARRAY2_DESTROY(reader[0]->dataBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->blockIdxArray, NULL);
#endif
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (reader[0]->fd[i]) {
tsdbCloseFile(&reader[0]->fd[i]);
}
}
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
tFree(reader[0]->bufArr[i]);
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
tBufferDestroy(reader[0]->local + i);
}
taosMemoryFree(reader[0]);
@ -188,38 +189,81 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size);
// load data
tBufferClear(&reader->buffers[0]);
code = tBufferEnsureCapacity(&reader->buffers[0], brinBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->buffers[0].data, brinBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
reader->buffers[0].size = brinBlk->dp->size;
#if 0
int32_t size = 0;
// decode brin block
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
tBrinBlockClear(brinBlock);
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[i], TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg,
&reader->config->bufArr[1], brinBlk->numRec * sizeof(int64_t), &reader->config->bufArr[2]);
brinBlock->numOfPKs = brinBlk->numOfPKs;
brinBlock->numOfRecords = brinBlk->numRec;
for (int32_t i = 0; i < 10; i++) {
SCompressInfo cinfo = {
.cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int64_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo,
brinBlock->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr1[i], reader->config->bufArr[1], brinBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += brinBlk->size[i];
br.offset += brinBlk->size[i];
}
for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[j], TSDB_DATA_TYPE_INT, brinBlk->cmprAlg,
&reader->config->bufArr[1], brinBlk->numRec * sizeof(int32_t), &reader->config->bufArr[2]);
for (int32_t i = 10; i < 15; i++) {
SCompressInfo cinfo = {
.cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_INT,
.compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int32_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo,
brinBlock->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr2[i], reader->config->bufArr[1], brinBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += brinBlk->size[j];
br.offset += brinBlk->size[i];
}
#endif
// primary keys
if (brinBlk->numOfPKs > 0) { // decode the primary keys
SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS];
SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS];
for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&br, firstInfos + i);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&br, lastInfos + i);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
SValueColumnCompressInfo *info = firstInfos + i;
int32_t totalCompressedSize = info->offsetCompressedSize + info->dataCompressedSize;
code = tValueColumnDecompress(tBufferGetDataAt(br.buffer, br.offset), totalCompressedSize, info,
brinBlock->firstKeyPKs + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += totalCompressedSize;
}
for (int32_t i = 0; i < brinBlk->numOfPKs; i++) {
SValueColumnCompressInfo *info = lastInfos + i;
int32_t totalCompressedSize = info->offsetCompressedSize + info->dataCompressedSize;
code = tValueColumnDecompress(tBufferGetDataAt(br.buffer, br.offset), totalCompressedSize, info,
brinBlock->lastKeyPKs + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += totalCompressedSize;
}
}
ASSERT(br.offset == br.buffer->size);
_exit:
if (code) {
@ -232,13 +276,15 @@ int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *re
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], record->blockSize);
// load data
tBufferClear(&reader->buffers[0]);
code = tBufferEnsureCapacity(&reader->buffers[0], record->blockSize);
TSDB_CHECK_CODE(code, lino, _exit);
code =
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize, 0);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->buffers[0].data, record->blockSize, 0);
TSDB_CHECK_CODE(code, lino, _exit);
reader->buffers[0].size = record->blockSize;
// decompress
code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
@ -611,7 +657,7 @@ struct SDataFileWriter {
SSkmInfo skmTb[1];
SSkmInfo skmRow[1];
uint8_t *bufArr[5];
SBuffer local[5];
SBuffer *buffers;
struct {
@ -673,8 +719,8 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
tBlockDataDestroy(writer->ctx->blockData);
tBrinBlockDestroy(writer->ctx->brinBlock);
for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
tFree(writer->bufArr[i]);
for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
tBufferDestroy(writer->local + i);
}
tDestroyTSchema(writer->skmRow->pTSchema);
@ -691,7 +737,7 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
SDataFileReaderConfig config[1] = {{
.tsdb = writer->config->tsdb,
.szPage = writer->config->szPage,
.bufArr = writer->config->bufArr,
.buffers = writer->buffers,
}};
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
@ -721,7 +767,10 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
writer->buffers = writer->config->buffers;
if (writer->buffers == NULL) {
writer->buffers = writer->local;
}
// open reader
code = tsdbDataFileWriterDoOpenReader(writer);

View File

@ -29,12 +29,12 @@ typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray;
typedef struct {
SFDataPtr brinBlkPtr[1];
SFDataPtr rsrvd[2];
char rsrvd[32];
} SHeadFooter;
typedef struct {
SFDataPtr tombBlkPtr[1];
SFDataPtr rsrvd[2];
char rsrvd[32];
} STombFooter;
// SDataFileReader =============================================
@ -46,7 +46,7 @@ typedef struct SDataFileReaderConfig {
bool exist;
STFile file;
} files[TSDB_FTYPE_MAX];
uint8_t **bufArr;
SBuffer *buffers;
} SDataFileReaderConfig;
int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config,
@ -83,7 +83,7 @@ typedef struct SDataFileWriterConfig {
} files[TSDB_FTYPE_MAX];
SSkmInfo *skmTb;
SSkmInfo *skmRow;
uint8_t **bufArr;
SBuffer *buffers;
} SDataFileWriterConfig;
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer);

View File

@ -29,17 +29,17 @@ struct STsdbSnapReader {
int64_t ever;
int8_t type;
uint8_t* aBuf[5];
SBuffer buffers[5];
SSkmInfo skmTb[1];
TFileSetRangeArray* fsrArr;
// context
struct {
int32_t fsrArrIdx;
int32_t fsrArrIdx;
STFileSetRange* fsr;
bool isDataDone;
bool isTombDone;
bool isDataDone;
bool isTombDone;
} ctx[1];
// reader
@ -68,7 +68,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
SDataFileReaderConfig config = {
.tsdb = reader->tsdb,
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
.bufArr = reader->aBuf,
.buffers = reader->buffers,
};
bool hasDataFile = false;
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
@ -1061,7 +1061,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang
writer[0]->compactVersion = INT64_MAX;
writer[0]->now = taosGetTimestampMs();
code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
code =
tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:

View File

@ -503,22 +503,20 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
if (statisBlk->numOfPKs > 0) {
SValueColumnCompressInfo firstKeyInfos[TD_MAX_PK_COLS];
SValueColumnCompressInfo lastKeyInfos[TD_MAX_PK_COLS];
SBufferReader bfReader;
tBufferReaderInit(&bfReader, true, size, &reader->buffers[0]);
SBufferReader br = BUFFER_READER_INITIALIZER(size, &reader->buffers[0]);
// decode compress info
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&bfReader, &firstKeyInfos[i]);
code = tValueColumnCompressInfoDecode(&br, &firstKeyInfos[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&bfReader, &lastKeyInfos[i]);
code = tValueColumnCompressInfoDecode(&br, &lastKeyInfos[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
size = bfReader.offset;
size = br.offset;
// decode value columns
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
@ -686,19 +684,17 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
// compress primary keys
if (statisBlk.numOfPKs > 0) {
SBufferWriter bfWriter;
SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg};
tBufferClear(&writer->buffers[0]);
tBufferClear(&writer->buffers[1]);
tBufferWriterInit(&bfWriter, true, 0, &writer->buffers[0]);
for (int32_t i = 0; i < statisBlk.numOfPKs; i++) {
code =
tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter);
code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -706,7 +702,7 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter);
code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}

View File

@ -1591,6 +1591,20 @@ _exit:
return code;
}
typedef struct SBlockDataCmprInfo {
// TODO
} SBlockDataCmprInfo;
int32_t tBlockDataCompress(SBlockData *blockData, SBlockDataCmprInfo *info, SBuffer *buffer, SBuffer *assist) {
// TODO
return 0;
}
int32_t tBlockDataDecompress(SBufferReader *reader, const SBlockDataCmprInfo *info, SBlockData *blockData) {
// TODO
return 0;
}
// SDiskDataHdr ==============================
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) {
int32_t n = 0;

View File

@ -114,15 +114,19 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco
ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs);
ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs);
code = tBufferAppend(&statisBlock->suids, &record->suid, sizeof(record->suid));
code = tBufferPutI64(&statisBlock->suids, record->suid);
if (code) return code;
code = tBufferAppend(&statisBlock->uids, &record->uid, sizeof(record->uid));
code = tBufferPutI64(&statisBlock->uids, record->uid);
if (code) return code;
code = tBufferAppend(&statisBlock->firstKeyTimestamps, &record->firstKey.ts, sizeof(record->firstKey.ts));
code = tBufferPutI64(&statisBlock->firstKeyTimestamps, record->firstKey.ts);
if (code) return code;
code = tBufferAppend(&statisBlock->lastKeyTimestamps, &record->lastKey.ts, sizeof(record->lastKey.ts));
code = tBufferPutI64(&statisBlock->lastKeyTimestamps, record->lastKey.ts);
if (code) return code;
code = tBufferAppend(&statisBlock->counts, &record->count, sizeof(record->count));
code = tBufferPutI64(&statisBlock->counts, record->count);
if (code) return code;
for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) {
@ -137,21 +141,31 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco
}
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) {
int32_t code;
int32_t code;
SBufferReader reader;
if (idx < 0 || idx >= statisBlock->numOfRecords) {
return TSDB_CODE_OUT_OF_RANGE;
}
code = tBufferGet(&statisBlock->suids, idx, sizeof(record->suid), &record->suid);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->suid), &statisBlock->suids);
code = tBufferGetI64(&reader, &record->suid);
if (code) return code;
code = tBufferGet(&statisBlock->uids, idx, sizeof(record->uid), &record->uid);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->uid), &statisBlock->uids);
code = tBufferGetI64(&reader, &record->uid);
if (code) return code;
code = tBufferGet(&statisBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.ts), &record->firstKey.ts);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->firstKey.ts), &statisBlock->firstKeyTimestamps);
code = tBufferGetI64(&reader, &record->firstKey.ts);
if (code) return code;
code = tBufferGet(&statisBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.ts), &record->lastKey.ts);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->lastKey.ts), &statisBlock->lastKeyTimestamps);
code = tBufferGetI64(&reader, &record->lastKey.ts);
if (code) return code;
code = tBufferGet(&statisBlock->counts, idx, sizeof(record->count), &record->count);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->count), &statisBlock->counts);
code = tBufferGetI64(&reader, &record->count);
if (code) return code;
record->firstKey.numOfPKs = statisBlock->numOfPKs;
@ -216,43 +230,60 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
}
ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs);
code = tBufferAppend(&brinBlock->suids, &record->suid, sizeof(record->suid));
code = tBufferPutI64(&brinBlock->suids, record->suid);
if (code) return code;
code = tBufferAppend(&brinBlock->uids, &record->uid, sizeof(record->uid));
code = tBufferPutI64(&brinBlock->uids, record->uid);
if (code) return code;
code = tBufferAppend(&brinBlock->firstKeyTimestamps, &record->firstKey.key.ts, sizeof(record->firstKey.key.ts));
code = tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts);
if (code) return code;
code = tBufferAppend(&brinBlock->firstKeyVersions, &record->firstKey.version, sizeof(record->firstKey.version));
code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version);
if (code) return code;
for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]);
if (code) return code;
}
code = tBufferAppend(&brinBlock->lastKeyTimestamps, &record->lastKey.key.ts, sizeof(record->lastKey.key.ts));
code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts);
if (code) return code;
code = tBufferAppend(&brinBlock->lastKeyVersions, &record->lastKey.version, sizeof(record->lastKey.version));
code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version);
if (code) return code;
for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]);
if (code) return code;
}
code = tBufferAppend(&brinBlock->minVers, &record->minVer, sizeof(record->minVer));
code = tBufferPutI64(&brinBlock->minVers, record->minVer);
if (code) return code;
code = tBufferAppend(&brinBlock->maxVers, &record->maxVer, sizeof(record->maxVer));
code = tBufferPutI64(&brinBlock->maxVers, record->maxVer);
if (code) return code;
code = tBufferAppend(&brinBlock->blockOffsets, &record->blockOffset, sizeof(record->blockOffset));
code = tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset);
if (code) return code;
code = tBufferAppend(&brinBlock->smaOffsets, &record->smaOffset, sizeof(record->smaOffset));
code = tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset);
if (code) return code;
code = tBufferAppend(&brinBlock->blockSizes, &record->blockSize, sizeof(record->blockSize));
code = tBufferPutI32(&brinBlock->blockSizes, record->blockSize);
if (code) return code;
code = tBufferAppend(&brinBlock->blockKeySizes, &record->blockKeySize, sizeof(record->blockKeySize));
code = tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize);
if (code) return code;
code = tBufferAppend(&brinBlock->smaSizes, &record->smaSize, sizeof(record->smaSize));
code = tBufferPutI32(&brinBlock->smaSizes, record->smaSize);
if (code) return code;
code = tBufferAppend(&brinBlock->numRows, &record->numRow, sizeof(record->numRow));
code = tBufferPutI32(&brinBlock->numRows, record->numRow);
if (code) return code;
code = tBufferAppend(&brinBlock->counts, &record->count, sizeof(record->count));
code = tBufferPutI32(&brinBlock->counts, record->count);
if (code) return code;
brinBlock->numOfRecords++;
@ -261,139 +292,172 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
}
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) {
int32_t code;
int32_t code;
SBufferReader reader;
if (idx < 0 || idx >= brinBlock->numOfRecords) {
return TSDB_CODE_OUT_OF_RANGE;
}
code = tBufferGet(&brinBlock->suids, idx, sizeof(record->suid), &record->suid);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->suids);
code = tBufferGetI64(&reader, &record->suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, idx, sizeof(record->uid), &record->uid);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->uids);
code = tBufferGetI64(&reader, &record->uid);
if (code) return code;
code = tBufferGet(&brinBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.key.ts), &record->firstKey.key.ts);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyTimestamps);
code = tBufferGetI64(&reader, &record->firstKey.key.ts);
if (code) return code;
code = tBufferGet(&brinBlock->firstKeyVersions, idx, sizeof(record->firstKey.version), &record->firstKey.version);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyVersions);
code = tBufferGetI64(&reader, &record->firstKey.version);
if (code) return code;
for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs;
record->firstKey.key.numOfPKs++) {
code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx,
&record->firstKey.key.pks[record->firstKey.key.numOfPKs]);
if (code) return code;
}
code = tBufferGet(&brinBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.key.ts), &record->lastKey.key.ts);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyTimestamps);
code = tBufferGetI64(&reader, &record->lastKey.key.ts);
if (code) return code;
code = tBufferGet(&brinBlock->lastKeyVersions, idx, sizeof(record->lastKey.version), &record->lastKey.version);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyVersions);
code = tBufferGetI64(&reader, &record->lastKey.version);
if (code) return code;
for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs;
record->lastKey.key.numOfPKs++) {
code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx,
&record->lastKey.key.pks[record->lastKey.key.numOfPKs]);
if (code) return code;
}
code = tBufferGet(&brinBlock->minVers, idx, sizeof(record->minVer), &record->minVer);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->minVers);
code = tBufferGetI64(&reader, &record->minVer);
if (code) return code;
code = tBufferGet(&brinBlock->maxVers, idx, sizeof(record->maxVer), &record->maxVer);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->maxVers);
code = tBufferGetI64(&reader, &record->maxVer);
if (code) return code;
code = tBufferGet(&brinBlock->blockOffsets, idx, sizeof(record->blockOffset), &record->blockOffset);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->blockOffsets);
code = tBufferGetI64(&reader, &record->blockOffset);
if (code) return code;
code = tBufferGet(&brinBlock->smaOffsets, idx, sizeof(record->smaOffset), &record->smaOffset);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->smaOffsets);
code = tBufferGetI64(&reader, &record->smaOffset);
if (code) return code;
code = tBufferGet(&brinBlock->blockSizes, idx, sizeof(record->blockSize), &record->blockSize);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockSizes);
code = tBufferGetI32(&reader, &record->blockSize);
if (code) return code;
code = tBufferGet(&brinBlock->blockKeySizes, idx, sizeof(record->blockKeySize), &record->blockKeySize);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockKeySizes);
code = tBufferGetI32(&reader, &record->blockKeySize);
if (code) return code;
code = tBufferGet(&brinBlock->smaSizes, idx, sizeof(record->smaSize), &record->smaSize);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->smaSizes);
code = tBufferGetI32(&reader, &record->smaSize);
if (code) return code;
code = tBufferGet(&brinBlock->numRows, idx, sizeof(record->numRow), &record->numRow);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->numRows);
code = tBufferGetI32(&reader, &record->numRow);
if (code) return code;
code = tBufferGet(&brinBlock->counts, idx, sizeof(record->count), &record->count);
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->counts);
code = tBufferGetI32(&reader, &record->count);
if (code) return code;
return 0;
}
int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) {
int32_t code;
SBuffer *helperBuffer = NULL; // TODO
// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) {
// int32_t code;
// SBuffer *helperBuffer = NULL; // TODO
brinBlk->dp[0].size = 0;
brinBlk->numRec = brinBlock->numOfRecords;
brinBlk->numOfPKs = brinBlock->numOfPKs;
// brinBlk->dp[0].size = 0;
// brinBlk->numRec = brinBlock->numOfRecords;
// brinBlk->numOfPKs = brinBlock->numOfPKs;
// minTbid
code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid);
if (code) return code;
// maxTbid
code =
tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), &brinBlk->maxTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), &brinBlk->maxTbid.uid);
if (code) return code;
// minVer and maxVer
const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
brinBlk->minVer = minVers[0];
brinBlk->maxVer = maxVers[0];
for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) {
if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i];
if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i];
}
// // minTbid
// code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid);
// if (code) return code;
// code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid);
// if (code) return code;
// // maxTbid
// code =
// tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid),
// &brinBlk->maxTbid.suid);
// if (code) return code;
// code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid),
// &brinBlk->maxTbid.uid); if (code) return code;
// // minVer and maxVer
// const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
// const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
// brinBlk->minVer = minVers[0];
// brinBlk->maxVer = maxVers[0];
// for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) {
// if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i];
// if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i];
// }
// compress data
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
SBuffer *bf = &brinBlock->buffers[i];
SCompressInfo info = {
.cmprAlg = brinBlk->cmprAlg,
};
// // compress data
// for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
// SBuffer *bf = &brinBlock->buffers[i];
// SCompressInfo info = {
// .cmprAlg = brinBlk->cmprAlg,
// };
if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_BIGINT;
} else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_INT;
} else {
ASSERT(0);
}
// if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
// info.dataType = TSDB_DATA_TYPE_BIGINT;
// } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
// info.dataType = TSDB_DATA_TYPE_INT;
// } else {
// ASSERT(0);
// }
code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer);
if (code) return code;
brinBlk->size[i] = info.compressedSize;
brinBlk->dp[0].size += info.compressedSize;
}
// code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer);
// if (code) return code;
// brinBlk->size[i] = info.compressedSize;
// brinBlk->dp[0].size += info.compressedSize;
// }
// encode primary keys
SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS];
SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS];
// // encode primary keys
// SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS];
// SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS];
for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
SValueColumn *vc = &brinBlock->firstKeyPKs[i];
firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer);
if (code) return code;
}
// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
// SValueColumn *vc = &brinBlock->firstKeyPKs[i];
// firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
// code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer);
// if (code) return code;
// }
for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
SValueColumn *vc = &brinBlock->lastKeyPKs[i];
lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer);
if (code) return code;
}
// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
// SValueColumn *vc = &brinBlock->lastKeyPKs[i];
// lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
// code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer);
// if (code) return code;
// }
return 0;
}
// return 0;
// }
int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
// if (brinBlk->fmtVersion == 0) {
// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock);
// } else if (brinBlk->fmtVersion == 1) {
// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock);
// } else {
// ASSERT(0);
// }
return 0;
}
// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
// if (brinBlk->fmtVersion == 0) {
// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock);
// } else if (brinBlk->fmtVersion == 1) {
// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock);
// } else {
// ASSERT(0);
// }
// return 0;
// }
// other apis ----------
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) {

View File

@ -84,11 +84,11 @@ typedef struct {
union {
SBuffer buffers[5];
struct {
SBuffer suids;
SBuffer uids;
SBuffer firstKeyTimestamps;
SBuffer lastKeyTimestamps;
SBuffer counts;
SBuffer suids; // int64_t
SBuffer uids; // int64_t
SBuffer firstKeyTimestamps; // int64_t
SBuffer lastKeyTimestamps; // int64_t
SBuffer counts; // int64_t
};
};
SValueColumn firstKeyPKs[TD_MAX_PK_COLS];
@ -180,8 +180,8 @@ int32_t tBrinBlockDestroy(SBrinBlock *brinBlock);
int32_t tBrinBlockClear(SBrinBlock *brinBlock);
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record);
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record);
int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer);
int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer);
// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// other apis
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);

View File

@ -28,74 +28,72 @@ TEST(BufferTest, forwardWriteAndRead) {
taosSeedRand(taosGetTimestampSec());
// write
SBufferWriter writer = BUFFER_WRITER_INITIALIZER(forward, tBufferGetSize(&buffer), &buffer);
/* fix-len struct */
STestStruct testStruct = {1, 2};
GTEST_ASSERT_EQ(tBufferPutFixed(&writer, &testStruct, sizeof(STestStruct)), 0);
GTEST_ASSERT_EQ(tBufferPut(&buffer, &testStruct, sizeof(STestStruct)), 0);
/* int8_t */
int8_t i8 = taosRand() % UINT8_MAX - INT8_MAX;
GTEST_ASSERT_EQ(tBufferPutI8(&writer, i8), 0);
GTEST_ASSERT_EQ(tBufferPutI8(&buffer, i8), 0);
/* int16_t */
int8_t i16 = taosRand() % UINT16_MAX - INT16_MAX;
GTEST_ASSERT_EQ(tBufferPutI16(&writer, i16), 0);
GTEST_ASSERT_EQ(tBufferPutI16(&buffer, i16), 0);
/* int32_t */
int8_t i32 = taosRand();
GTEST_ASSERT_EQ(tBufferPutI32(&writer, i32), 0);
GTEST_ASSERT_EQ(tBufferPutI32(&buffer, i32), 0);
/* int64_t */
int64_t i64 = taosRand();
GTEST_ASSERT_EQ(tBufferPutI64(&writer, i64), 0);
GTEST_ASSERT_EQ(tBufferPutI64(&buffer, i64), 0);
/* uint8_t */
uint8_t u8 = taosRand() % UINT8_MAX;
GTEST_ASSERT_EQ(tBufferPutU8(&writer, u8), 0);
GTEST_ASSERT_EQ(tBufferPutU8(&buffer, u8), 0);
/* uint16_t */
uint16_t u16 = taosRand() % UINT16_MAX;
GTEST_ASSERT_EQ(tBufferPutU16(&writer, u16), 0);
GTEST_ASSERT_EQ(tBufferPutU16(&buffer, u16), 0);
/* uint32_t */
uint32_t u32 = taosRand();
GTEST_ASSERT_EQ(tBufferPutU32(&writer, u32), 0);
GTEST_ASSERT_EQ(tBufferPutU32(&buffer, u32), 0);
/* uint64_t */
uint64_t u64 = taosRand();
GTEST_ASSERT_EQ(tBufferPutU64(&writer, u64), 0);
GTEST_ASSERT_EQ(tBufferPutU64(&buffer, u64), 0);
/* float */
float f = (float)taosRand() / (float)taosRand();
GTEST_ASSERT_EQ(tBufferPutF32(&writer, f), 0);
GTEST_ASSERT_EQ(tBufferPutF32(&buffer, f), 0);
/* double */
double d = (double)taosRand() / (double)taosRand();
GTEST_ASSERT_EQ(tBufferPutF64(&writer, d), 0);
GTEST_ASSERT_EQ(tBufferPutF64(&buffer, d), 0);
/* binary */
uint8_t binary[10];
for (int32_t i = 0; i < sizeof(binary); ++i) {
binary[i] = taosRand() % UINT8_MAX;
}
GTEST_ASSERT_EQ(tBufferPutBinary(&writer, binary, sizeof(binary)), 0);
GTEST_ASSERT_EQ(tBufferPutBinary(&buffer, binary, sizeof(binary)), 0);
/* cstr */
const char *cstr = "hello world";
GTEST_ASSERT_EQ(tBufferPutCStr(&writer, cstr), 0);
GTEST_ASSERT_EQ(tBufferPutCStr(&buffer, cstr), 0);
/* uint16v_t */
uint16_t u16v[] = {0, 127, 128, 129, 16384, 16385, 16386, UINT16_MAX};
for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU16v(&writer, u16v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutU16v(&buffer, u16v[i]), 0);
}
/* uint32v_t */
uint32_t u32v[] = {0, 127, 128, 129, 16384, 16385, 16386, (1 << 21) - 1,
(1 << 21), (1 << 21) + 1, (1 << 28) - 1, (1 << 28), (1 << 28) + 1, UINT32_MAX};
for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU32v(&writer, u32v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutU32v(&buffer, u32v[i]), 0);
}
/* uint64v_t */
@ -129,7 +127,7 @@ TEST(BufferTest, forwardWriteAndRead) {
(1ul << (7 * 9)) + 1,
UINT64_MAX};
for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU64v(&writer, u64v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutU64v(&buffer, u64v[i]), 0);
}
/* int16v_t */
@ -153,7 +151,7 @@ TEST(BufferTest, forwardWriteAndRead) {
INT16_MAX,
};
for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI16v(&writer, i16v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutI16v(&buffer, i16v[i]), 0);
}
/* int32v_t */
@ -189,7 +187,7 @@ TEST(BufferTest, forwardWriteAndRead) {
INT32_MAX,
};
for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI32v(&writer, i32v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutI32v(&buffer, i32v[i]), 0);
}
/* int64v_t */
@ -248,17 +246,15 @@ TEST(BufferTest, forwardWriteAndRead) {
INT64_MAX,
};
for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI64v(&writer, i64v[i]), 0);
GTEST_ASSERT_EQ(tBufferPutI64v(&buffer, i64v[i]), 0);
}
tBufferWriterDestroy(&writer);
// read
SBufferReader reader = BUFFER_READER_INITIALIZER(forward, 0, &buffer);
SBufferReader reader = BUFFER_READER_INITIALIZER(0, &buffer);
/* fix-len struct */
STestStruct testStruct2 = {1, 2};
GTEST_ASSERT_EQ(tBufferGetFixed(&reader, &testStruct2, sizeof(STestStruct)), 0);
GTEST_ASSERT_EQ(tBufferGet(&reader, sizeof(STestStruct), &testStruct2), 0);
GTEST_ASSERT_EQ(testStruct.value1, testStruct2.value1);
GTEST_ASSERT_EQ(testStruct.value2, testStruct2.value2);
GTEST_ASSERT_EQ(testStruct.value3, testStruct2.value3);
@ -372,359 +368,3 @@ TEST(BufferTest, forwardWriteAndRead) {
// clear
tBufferDestroy(&buffer);
}
TEST(BufferTest, backwardWriteAndRead) {
int32_t code = 0;
bool forward = false;
SBuffer buffer;
tBufferInit(&buffer);
taosSeedRand(taosGetTimestampSec());
// write
SBufferWriter writer = BUFFER_WRITER_INITIALIZER(forward, 4096, &buffer);
/* fix-len struct */
STestStruct testStruct = {1, 2};
GTEST_ASSERT_EQ(tBufferPutFixed(&writer, &testStruct, sizeof(STestStruct)), 0);
/* int8_t */
int8_t i8 = taosRand() % UINT8_MAX - INT8_MAX;
GTEST_ASSERT_EQ(tBufferPutI8(&writer, i8), 0);
/* int16_t */
int8_t i16 = taosRand() % UINT16_MAX - INT16_MAX;
GTEST_ASSERT_EQ(tBufferPutI16(&writer, i16), 0);
/* int32_t */
int8_t i32 = taosRand();
GTEST_ASSERT_EQ(tBufferPutI32(&writer, i32), 0);
/* int64_t */
int64_t i64 = taosRand();
GTEST_ASSERT_EQ(tBufferPutI64(&writer, i64), 0);
/* uint8_t */
uint8_t u8 = taosRand() % UINT8_MAX;
GTEST_ASSERT_EQ(tBufferPutU8(&writer, u8), 0);
/* uint16_t */
uint16_t u16 = taosRand() % UINT16_MAX;
GTEST_ASSERT_EQ(tBufferPutU16(&writer, u16), 0);
/* uint32_t */
uint32_t u32 = taosRand();
GTEST_ASSERT_EQ(tBufferPutU32(&writer, u32), 0);
/* uint64_t */
uint64_t u64 = taosRand();
GTEST_ASSERT_EQ(tBufferPutU64(&writer, u64), 0);
/* float */
float f = (float)taosRand() / (float)taosRand();
GTEST_ASSERT_EQ(tBufferPutF32(&writer, f), 0);
/* double */
double d = (double)taosRand() / (double)taosRand();
GTEST_ASSERT_EQ(tBufferPutF64(&writer, d), 0);
/* binary */
uint8_t binary[10];
for (int32_t i = 0; i < sizeof(binary); ++i) {
binary[i] = taosRand() % UINT8_MAX;
}
GTEST_ASSERT_EQ(tBufferPutBinary(&writer, binary, sizeof(binary)), 0);
/* cstr */
const char *cstr = "hello world";
GTEST_ASSERT_EQ(tBufferPutCStr(&writer, cstr), 0);
/* uint16v_t */
uint16_t u16v[] = {0, 127, 128, 129, 16384, 16385, 16386, UINT16_MAX};
for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU16v(&writer, u16v[i]), 0);
}
/* uint32v_t */
uint32_t u32v[] = {0, 127, 128, 129, 16384, 16385, 16386, (1 << 21) - 1,
(1 << 21), (1 << 21) + 1, (1 << 28) - 1, (1 << 28), (1 << 28) + 1, UINT32_MAX};
for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU32v(&writer, u32v[i]), 0);
}
/* uint64v_t */
uint64_t u64v[] = {0, // 0
(1ul << (7 * 1)) - 1,
(1ul << (7 * 1)),
(1ul << (7 * 1)) + 1,
(1ul << (7 * 2)) - 1,
(1ul << (7 * 2)),
(1ul << (7 * 2)) + 1,
(1ul << (7 * 3)) - 1,
(1ul << (7 * 3)),
(1ul << (7 * 3)) + 1,
(1ul << (7 * 4)) - 1,
(1ul << (7 * 4)),
(1ul << (7 * 4)) + 1,
(1ul << (7 * 5)) - 1,
(1ul << (7 * 5)),
(1ul << (7 * 5)) + 1,
(1ul << (7 * 6)) - 1,
(1ul << (7 * 6)),
(1ul << (7 * 6)) + 1,
(1ul << (7 * 7)) - 1,
(1ul << (7 * 7)),
(1ul << (7 * 7)) + 1,
(1ul << (7 * 8)) - 1,
(1ul << (7 * 8)),
(1ul << (7 * 8)) + 1,
(1ul << (7 * 9)) - 1,
(1ul << (7 * 9)),
(1ul << (7 * 9)) + 1,
UINT64_MAX};
for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutU64v(&writer, u64v[i]), 0);
}
/* int16v_t */
int16_t i16v[] = {
INT16_MIN, //
-((1 << (7 * 1)) - 1),
-((1 << (7 * 1))),
-((1 << (7 * 1)) + 1),
-((1 << (7 * 2)) - 1),
-((1 << (7 * 2))),
-((1 << (7 * 2)) + 1),
(1 << (7 * 0)) - 1,
(1 << (7 * 0)),
(1 << (7 * 0)) + 1,
(1 << (7 * 1)) - 1,
(1 << (7 * 1)),
(1 << (7 * 1)) + 1,
(1 << (7 * 2)) - 1,
(1 << (7 * 2)),
(1 << (7 * 2)) + 1,
INT16_MAX,
};
for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI16v(&writer, i16v[i]), 0);
}
/* int32v_t */
int32_t i32v[] = {
INT32_MIN, //
-((1 << (7 * 1)) - 1),
-((1 << (7 * 1))),
-((1 << (7 * 1)) + 1),
-((1 << (7 * 2)) - 1),
-((1 << (7 * 2))),
-((1 << (7 * 2)) + 1),
-((1 << (7 * 3)) - 1),
-((1 << (7 * 3))),
-((1 << (7 * 3)) + 1),
-((1 << (7 * 4)) - 1),
-((1 << (7 * 4))),
-((1 << (7 * 4)) + 1),
(1 << (7 * 0)) - 1,
(1 << (7 * 0)),
(1 << (7 * 0)) + 1,
(1 << (7 * 1)) - 1,
(1 << (7 * 1)),
(1 << (7 * 1)) + 1,
(1 << (7 * 2)) - 1,
(1 << (7 * 2)),
(1 << (7 * 2)) + 1,
(1 << (7 * 3)) - 1,
(1 << (7 * 3)),
(1 << (7 * 3)) + 1,
(1 << (7 * 4)) - 1,
(1 << (7 * 4)),
(1 << (7 * 4)) + 1,
INT32_MAX,
};
for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI32v(&writer, i32v[i]), 0);
}
/* int64v_t */
int64_t i64v[] = {
INT64_MIN, //
-((1l << (7 * 1)) - 1),
-((1l << (7 * 1))),
-((1l << (7 * 1)) + 1),
-((1l << (7 * 2)) - 1),
-((1l << (7 * 2))),
-((1l << (7 * 2)) + 1),
-((1l << (7 * 3)) - 1),
-((1l << (7 * 3))),
-((1l << (7 * 3)) + 1),
-((1l << (7 * 4)) - 1),
-((1l << (7 * 4))),
-((1l << (7 * 4)) + 1),
-((1l << (7 * 5)) - 1),
-((1l << (7 * 5))),
-((1l << (7 * 5)) + 1),
-((1l << (7 * 6)) - 1),
-((1l << (7 * 6))),
-((1l << (7 * 6)) + 1),
-((1l << (7 * 7)) - 1),
-((1l << (7 * 7))),
-((1l << (7 * 7)) + 1),
-((1l << (7 * 8)) - 1),
-((1l << (7 * 8))),
-((1l << (7 * 8)) + 1),
-((1l << (7 * 9)) + 1),
((1l << (7 * 1)) - 1),
((1l << (7 * 1))),
((1l << (7 * 1)) + 1),
((1l << (7 * 2)) - 1),
((1l << (7 * 2))),
((1l << (7 * 2)) + 1),
((1l << (7 * 3)) - 1),
((1l << (7 * 3))),
((1l << (7 * 3)) + 1),
((1l << (7 * 4)) - 1),
((1l << (7 * 4))),
((1l << (7 * 4)) + 1),
((1l << (7 * 5)) - 1),
((1l << (7 * 5))),
((1l << (7 * 5)) + 1),
((1l << (7 * 6)) - 1),
((1l << (7 * 6))),
((1l << (7 * 6)) + 1),
((1l << (7 * 7)) - 1),
((1l << (7 * 7))),
((1l << (7 * 7)) + 1),
((1l << (7 * 8)) - 1),
((1l << (7 * 8))),
((1l << (7 * 8)) + 1),
((1l << (7 * 9)) + 1),
INT64_MAX,
};
for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferPutI64v(&writer, i64v[i]), 0);
}
tBufferWriterDestroy(&writer);
// read
SBufferReader reader = BUFFER_READER_INITIALIZER(forward, 4096, &buffer);
/* fix-len struct */
STestStruct testStruct2 = {1, 2};
GTEST_ASSERT_EQ(tBufferGetFixed(&reader, &testStruct2, sizeof(STestStruct)), 0);
GTEST_ASSERT_EQ(testStruct.value1, testStruct2.value1);
GTEST_ASSERT_EQ(testStruct.value2, testStruct2.value2);
GTEST_ASSERT_EQ(testStruct.value3, testStruct2.value3);
/* int8_t */
int8_t i8_2 = 97;
GTEST_ASSERT_EQ(tBufferGetI8(&reader, &i8_2), 0);
GTEST_ASSERT_EQ(i8, i8_2);
/* int16_t */
int16_t i16_2;
GTEST_ASSERT_EQ(tBufferGetI16(&reader, &i16_2), 0);
GTEST_ASSERT_EQ(i16, i16_2);
/* int32_t */
int32_t i32_2;
GTEST_ASSERT_EQ(tBufferGetI32(&reader, &i32_2), 0);
GTEST_ASSERT_EQ(i32, i32_2);
/* int64_t */
int64_t i64_2;
GTEST_ASSERT_EQ(tBufferGetI64(&reader, &i64_2), 0);
GTEST_ASSERT_EQ(i64, i64_2);
/* uint8_t */
uint8_t u8_2;
GTEST_ASSERT_EQ(tBufferGetU8(&reader, &u8_2), 0);
GTEST_ASSERT_EQ(u8, u8_2);
/* uint16_t */
uint16_t u16_2;
GTEST_ASSERT_EQ(tBufferGetU16(&reader, &u16_2), 0);
GTEST_ASSERT_EQ(u16, u16_2);
/* uint32_t */
uint32_t u32_2;
GTEST_ASSERT_EQ(tBufferGetU32(&reader, &u32_2), 0);
GTEST_ASSERT_EQ(u32, u32_2);
/* uint64_t */
uint64_t u64_2;
GTEST_ASSERT_EQ(tBufferGetU64(&reader, &u64_2), 0);
GTEST_ASSERT_EQ(u64, u64_2);
/* float */
float f_2;
GTEST_ASSERT_EQ(tBufferGetF32(&reader, &f_2), 0);
GTEST_ASSERT_EQ(f, f_2);
/* double */
double d_2;
GTEST_ASSERT_EQ(tBufferGetF64(&reader, &d_2), 0);
GTEST_ASSERT_EQ(d, d_2);
/* binary */
const void *binary2;
uint32_t binarySize;
GTEST_ASSERT_EQ(tBufferGetBinary(&reader, &binary2, &binarySize), 0);
GTEST_ASSERT_EQ(memcmp(binary, binary2, sizeof(binary)), 0);
GTEST_ASSERT_EQ(binarySize, sizeof(binary));
/* cstr */
const char *cstr2;
GTEST_ASSERT_EQ(tBufferGetCStr(&reader, &cstr2), 0);
GTEST_ASSERT_EQ(strcmp(cstr, cstr2), 0);
/* uint16v_t */
uint16_t u16v2[sizeof(u16v) / sizeof(u16v[0])];
for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetU16v(&reader, &u16v2[i]), 0);
GTEST_ASSERT_EQ(u16v[i], u16v2[i]);
}
/* uint32v_t */
uint32_t u32v2[sizeof(u32v) / sizeof(u32v[0])];
for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetU32v(&reader, &u32v2[i]), 0);
GTEST_ASSERT_EQ(u32v[i], u32v2[i]);
}
/* uint64v_t */
uint64_t u64v2[sizeof(u64v) / sizeof(u64v[0])];
for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetU64v(&reader, &u64v2[i]), 0);
GTEST_ASSERT_EQ(u64v[i], u64v2[i]);
}
/* int16v_t */
int16_t i16v2[sizeof(i16v) / sizeof(i16v[0])];
for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetI16v(&reader, &i16v2[i]), 0);
GTEST_ASSERT_EQ(i16v[i], i16v2[i]);
}
/* int32v_t */
int32_t i32v2[sizeof(i32v) / sizeof(i32v[0])];
for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetI32v(&reader, &i32v2[i]), 0);
GTEST_ASSERT_EQ(i32v[i], i32v2[i]);
}
/* int64v_t */
int64_t i64v2[sizeof(i64v) / sizeof(i64v[0])];
for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) {
GTEST_ASSERT_EQ(tBufferGetI64v(&reader, &i64v2[i]), 0);
GTEST_ASSERT_EQ(i64v[i], i64v2[i]);
}
tBufferReaderDestroy(&reader);
GTEST_ASSERT_EQ(reader.offset, writer.offset);
// clear
tBufferDestroy(&buffer);
}