diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 08e41b2115..46b899372d 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -112,7 +112,7 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value); int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist); int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, SValueColumn *valCol, SBuffer *buffer); -int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); +int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBuffer *buffer); int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); int32_t tValueCompare(const SValue *tv1, const SValue *tv2); diff --git a/include/util/tbuffer.h b/include/util/tbuffer.h index 56645f4403..a964dde720 100644 --- a/include/util/tbuffer.h +++ b/include/util/tbuffer.h @@ -32,45 +32,38 @@ static int32_t tBufferInit(SBuffer *buffer); static int32_t tBufferDestroy(SBuffer *buffer); static int32_t tBufferClear(SBuffer *buffer); static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity); -static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size); -static int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data); +static int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size); +static int32_t tBufferPutI8(SBuffer *buffer, int8_t value); +static int32_t tBufferPutI16(SBuffer *buffer, int16_t value); +static int32_t tBufferPutI32(SBuffer *buffer, int32_t value); +static int32_t tBufferPutI64(SBuffer *buffer, int64_t value); +static int32_t tBufferPutU8(SBuffer *buffer, uint8_t value); +static int32_t tBufferPutU16(SBuffer *buffer, uint16_t value); +static int32_t tBufferPutU32(SBuffer *buffer, uint32_t value); +static int32_t tBufferPutU64(SBuffer *buffer, uint64_t value); +static int32_t tBufferPutI16v(SBuffer *buffer, int16_t value); +static int32_t tBufferPutI32v(SBuffer *buffer, int32_t value); +static int32_t tBufferPutI64v(SBuffer *buffer, int64_t value); +static int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value); +static int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value); +static int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value); +static int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size); +static int32_t tBufferPutCStr(SBuffer *buffer, const char *str); +static int32_t tBufferPutF32(SBuffer *buffer, float value); +static int32_t tBufferPutF64(SBuffer *buffer, double value); + #define tBufferGetSize(buffer) ((buffer)->size) #define tBufferGetCapacity(buffer) ((buffer)->capacity) #define tBufferGetData(buffer) ((buffer)->data) #define tBufferGetDataAt(buffer, idx) ((char *)(buffer)->data + (idx)) #define tBufferGetDataEnd(buffer) ((char *)(buffer)->data + (buffer)->size) -// SBufferWriter -#define BUFFER_WRITER_INITIALIZER(forward, offset, buffer) ((SBufferWriter){forward, offset, buffer}) -#define tBufferWriterDestroy(writer) ((void)0) -#define tBufferWriterGetOffset(writer) ((writer)->offset) -static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer); -static int32_t tBufferPutFixed(SBufferWriter *writer, const void *data, uint32_t size); -static int32_t tBufferPutI8(SBufferWriter *writer, int8_t value); -static int32_t tBufferPutI16(SBufferWriter *writer, int16_t value); -static int32_t tBufferPutI32(SBufferWriter *writer, int32_t value); -static int32_t tBufferPutI64(SBufferWriter *writer, int64_t value); -static int32_t tBufferPutU8(SBufferWriter *writer, uint8_t value); -static int32_t tBufferPutU16(SBufferWriter *writer, uint16_t value); -static int32_t tBufferPutU32(SBufferWriter *writer, uint32_t value); -static int32_t tBufferPutU64(SBufferWriter *writer, uint64_t value); -static int32_t tBufferPutI16v(SBufferWriter *writer, int16_t value); -static int32_t tBufferPutI32v(SBufferWriter *writer, int32_t value); -static int32_t tBufferPutI64v(SBufferWriter *writer, int64_t value); -static int32_t tBufferPutU16v(SBufferWriter *writer, uint16_t value); -static int32_t tBufferPutU32v(SBufferWriter *writer, uint32_t value); -static int32_t tBufferPutU64v(SBufferWriter *writer, uint64_t value); -static int32_t tBufferPutBinary(SBufferWriter *writer, const void *data, uint32_t size); -static int32_t tBufferPutCStr(SBufferWriter *writer, const char *str); -static int32_t tBufferPutF32(SBufferWriter *writer, float value); -static int32_t tBufferPutF64(SBufferWriter *writer, double value); - // SBufferReader -#define BUFFER_READER_INITIALIZER(forward, offset, buffer) ((SBufferReader){forward, offset, buffer}) -#define tBufferReaderDestroy(reader) ((void)0) -#define tBufferReaderGetOffset(reader) ((reader)->offset) -static int32_t tBufferReaderInit(SBufferReader *reader, bool forward, uint32_t offset, SBuffer *buffer); -static int32_t tBufferGetFixed(SBufferReader *reader, void *data, uint32_t size); +#define BUFFER_READER_INITIALIZER(offset, buffer) ((SBufferReader){offset, buffer}) +#define tBufferReaderDestroy(reader) ((void)0) +#define tBufferReaderGetOffset(reader) ((reader)->offset) +static int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data); +static int32_t tBufferReaderInit(SBufferReader *reader, uint32_t offset, SBuffer *buffer); static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value); static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value); static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value); diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc index aad4bf4acf..99d0fac593 100644 --- a/include/util/tbuffer.inc +++ b/include/util/tbuffer.inc @@ -29,7 +29,6 @@ struct SBufferWriter { }; struct SBufferReader { - bool forward; uint32_t offset; SBuffer *buffer; }; @@ -73,7 +72,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa return 0; } -static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) { +static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size) { int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size); if (code) return code; memcpy((char *)buffer->data + buffer->size, data, size); @@ -81,184 +80,132 @@ static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uin return 0; } -static FORCE_INLINE int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data) { - if (index < 0 || (index + 1) * size > buffer->size) { - return TSDB_CODE_OUT_OF_RANGE; - } - memcpy(data, (char *)buffer->data + index * size, size); - return 0; +static FORCE_INLINE int32_t tBufferPutI8(SBuffer *buffer, int8_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -// SBufferWriter -static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer) { - writer->forward = forward; - writer->offset = offset; - writer->buffer = buffer; - return 0; +static FORCE_INLINE int32_t tBufferPutI16(SBuffer *buffer, int16_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutFixed(SBufferWriter *writer, const void *data, uint32_t size) { - if (!writer->forward && writer->offset < size) { - return TSDB_CODE_OPS_NOT_SUPPORT; - } - - int32_t code = tBufferEnsureCapacity(writer->buffer, writer->forward ? writer->offset + size : writer->offset); - if (code) return code; - - if (writer->forward) { - memcpy((char *)writer->buffer->data + writer->offset, data, size); - writer->offset += size; - } else { - writer->offset -= size; - memcpy((char *)writer->buffer->data + writer->offset, data, size); - } - return 0; +static FORCE_INLINE int32_t tBufferPutI32(SBuffer *buffer, int32_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutI8(SBufferWriter *writer, int8_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); +static FORCE_INLINE int32_t tBufferPutI64(SBuffer *buffer, int64_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutI16(SBufferWriter *writer, int16_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); +static FORCE_INLINE int32_t tBufferPutU8(SBuffer *buffer, uint8_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutI32(SBufferWriter *writer, int32_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); +static FORCE_INLINE int32_t tBufferPutU16(SBuffer *buffer, uint16_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutI64(SBufferWriter *writer, int64_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); +static FORCE_INLINE int32_t tBufferPutU32(SBuffer *buffer, uint32_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutU8(SBufferWriter *writer, uint8_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); +static FORCE_INLINE int32_t tBufferPutU64(SBuffer *buffer, uint64_t value) { + return tBufferPut(buffer, &value, sizeof(value)); } -static FORCE_INLINE int32_t tBufferPutU16(SBufferWriter *writer, uint16_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); -} +static FORCE_INLINE int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value) { return tBufferPutU64v(buffer, value); } -static FORCE_INLINE int32_t tBufferPutU32(SBufferWriter *writer, uint32_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); -} +static FORCE_INLINE int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value) { return tBufferPutU64v(buffer, value); } -static FORCE_INLINE int32_t tBufferPutU64(SBufferWriter *writer, uint64_t value) { - return tBufferPutFixed(writer, &value, sizeof(value)); -} - -static FORCE_INLINE int32_t tBufferPutU64v(SBufferWriter *writer, uint64_t value) { +static FORCE_INLINE int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value) { int32_t code; while (value >= 0x80) { - code = tBufferPutU8(writer, (value & 0x7F) | 0x80); + code = tBufferPutU8(buffer, (value & 0x7F) | 0x80); if (code) return code; value >>= 7; } - return tBufferPutU8(writer, value); + return tBufferPutU8(buffer, value); } -static FORCE_INLINE int32_t tBufferPutU16v(SBufferWriter *writer, uint16_t value) { - return tBufferPutU64v(writer, value); +static FORCE_INLINE int32_t tBufferPutI16v(SBuffer *buffer, int16_t value) { + return tBufferPutU64v(buffer, ZIGZAGE(int16_t, value)); } -static FORCE_INLINE int32_t tBufferPutU32v(SBufferWriter *writer, uint32_t value) { - return tBufferPutU64v(writer, value); +static FORCE_INLINE int32_t tBufferPutI32v(SBuffer *buffer, int32_t value) { + return tBufferPutU64v(buffer, ZIGZAGE(int32_t, value)); } -static FORCE_INLINE int32_t tBufferPutI16v(SBufferWriter *writer, int16_t value) { - return tBufferPutU16v(writer, ZIGZAGE(int16_t, value)); +static FORCE_INLINE int32_t tBufferPutI64v(SBuffer *buffer, int64_t value) { + return tBufferPutU64v(buffer, ZIGZAGE(int64_t, value)); } -static FORCE_INLINE int32_t tBufferPutI32v(SBufferWriter *writer, int32_t value) { - return tBufferPutU32v(writer, ZIGZAGE(int32_t, value)); -} - -static FORCE_INLINE int32_t tBufferPutI64v(SBufferWriter *writer, int64_t value) { - return tBufferPutU64v(writer, ZIGZAGE(int64_t, value)); -} - -static FORCE_INLINE int32_t tBufferPutBinary(SBufferWriter *writer, const void *data, uint32_t size) { - int32_t code = tBufferPutU32(writer, size); +static FORCE_INLINE int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size) { + int32_t code = tBufferPutU32v(buffer, size); if (code) return code; - return tBufferPutFixed(writer, data, size); + return tBufferPut(buffer, data, size); } -static FORCE_INLINE int32_t tBufferPutCStr(SBufferWriter *writer, const char *str) { - return tBufferPutBinary(writer, str, strlen(str) + 1); +static FORCE_INLINE int32_t tBufferPutCStr(SBuffer *buffer, const char *str) { + return tBufferPutBinary(buffer, str, str ? 0 : strlen(str) + 1); } -static FORCE_INLINE int32_t tBufferPutF32(SBufferWriter *writer, float value) { +static FORCE_INLINE int32_t tBufferPutF32(SBuffer *buffer, float value) { union { float f; uint32_t u; } u = {.f = value}; - return tBufferPutU32(writer, u.u); + return tBufferPutU32(buffer, u.u); } -static FORCE_INLINE int32_t tBufferPutF64(SBufferWriter *writer, double value) { +static FORCE_INLINE int32_t tBufferPutF64(SBuffer *buffer, double value) { union { double f; uint64_t u; } u = {.f = value}; - return tBufferPutU64(writer, u.u); + return tBufferPutU64(buffer, u.u); } +// reader // SBufferReader -static int32_t tBufferReaderInit(SBufferReader *reader, bool forward, uint32_t offset, SBuffer *buffer) { - reader->forward = forward; - reader->offset = offset; - reader->buffer = buffer; +static int32_t tBufferReaderInit(SBufferReader *reader, uint32_t offset, SBuffer *buffer) { + (*reader) = BUFFER_READER_INITIALIZER(offset, buffer); return 0; } -static int32_t tBufferGetFixed(SBufferReader *reader, void *data, uint32_t size) { - if ((reader->forward && reader->offset + size > reader->buffer->capacity) || - (!reader->forward && reader->offset < size)) { - return TSDB_CODE_OPS_NOT_SUPPORT; - } - - if (data) { - if (reader->forward) { - memcpy(data, (char *)reader->buffer->data + reader->offset, size); - reader->offset += size; - } else { - reader->offset -= size; - memcpy(data, (char *)reader->buffer->data + reader->offset, size); - } +static FORCE_INLINE int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data) { + if (reader->offset < 0 || reader->offset + size > reader->buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; } + memcpy(data, (char *)reader->buffer->data + reader->offset, size); + reader->offset += size; return 0; } -static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); -} +static int32_t tBufferGetI8(SBufferReader *reader, int8_t *value) { return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } -static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); -} +static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetU16(SBufferReader *reader, uint16_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetU32(SBufferReader *reader, uint32_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetU64(SBufferReader *reader, uint64_t *value) { - return tBufferGetFixed(reader, value, sizeof(*value)); + return tBufferGet(reader, sizeof(*value), value); } static int32_t tBufferGetU64v(SBufferReader *reader, uint64_t *value) { @@ -274,7 +221,7 @@ static int32_t tBufferGetU64v(SBufferReader *reader, uint64_t *value) { if (code) return code; if (value) { - *value |= ((uint64_t)(byte & 0x7F)) << (i * 7); + *value |= (((uint64_t)(byte & 0x7F)) << (i * 7)); } if (byte < 0x80) { @@ -340,29 +287,25 @@ static int32_t tBufferGetBinary(SBufferReader *reader, const void **data, uint32 int32_t code; // size - code = tBufferGetU32(reader, &tmpSize); + code = tBufferGetU32v(reader, &tmpSize); if (code) return code; if (size) { *size = tmpSize; } + // data - if (reader->forward) { - if (reader->offset + tmpSize > reader->buffer->capacity) { - return TSDB_CODE_OPS_NOT_SUPPORT; + if (tmpSize > 0) { + if (reader->offset + tmpSize > reader->buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; } if (data) { *data = (char *)reader->buffer->data + reader->offset; } reader->offset += tmpSize; } else { - if (reader->offset < tmpSize) { - return TSDB_CODE_OPS_NOT_SUPPORT; - } - reader->offset -= tmpSize; - if (data) { - *data = (char *)reader->buffer->data + reader->offset; - } + *data = NULL; } + return 0; } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 0f4a896ae2..c838730b00 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -16,7 +16,6 @@ #define _DEFAULT_SOURCE #include "tdataformat.h" #include "tRealloc.h" -#include "tcoding.h" #include "tdatablock.h" #include "tlog.h" @@ -3993,15 +3992,14 @@ int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value) { ASSERT(value->type == valCol->type); if (IS_VAR_DATA_TYPE(value->type)) { - int32_t offset = tBufferGetSize(&valCol->data); - if ((code = tBufferAppend(&valCol->offsets, &offset, sizeof(offset)))) { + if ((code = tBufferPutI32(&valCol->offsets, tBufferGetSize(&valCol->data)))) { return code; } - if ((code = tBufferAppend(&valCol->data, value->pData, value->nData))) { + if ((code = tBufferPut(&valCol->data, value->pData, value->nData))) { return code; } } else { - return tBufferAppend(&valCol->data, &value->val, tDataTypes[value->type].bytes); + return tBufferPut(&valCol->data, &value->val, tDataTypes[value->type].bytes); } valCol->numOfValues++; @@ -4015,18 +4013,20 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) { value->type = valCol->type; if (IS_VAR_DATA_TYPE(value->type)) { - int32_t offset, nextOffset; + int32_t offset, nextOffset; + SBufferReader reader = BUFFER_READER_INITIALIZER(idx * sizeof(offset), &valCol->offsets); - tBufferGet(&valCol->offsets, idx, sizeof(offset), &offset); + tBufferGetI32(&reader, &offset); if (idx == valCol->numOfValues - 1) { nextOffset = tBufferGetSize(&valCol->data); } else { - tBufferGet(&valCol->offsets, idx + 1, sizeof(nextOffset), &nextOffset); + tBufferGetI32(&reader, &nextOffset); } value->nData = nextOffset - offset; value->pData = (uint8_t *)tBufferGetDataAt(&valCol->data, offset); } else { - tBufferGet(&valCol->data, idx, tDataTypes[value->type].bytes, &value->val); + SBufferReader reader = BUFFER_READER_INITIALIZER(idx * tDataTypes[value->type].bytes, &valCol->data); + tBufferGet(&reader, tDataTypes[value->type].bytes, &value->val); } return 0; } @@ -4140,54 +4140,42 @@ int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColum return 0; } -int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer) { +int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *info, SBuffer *buffer) { int32_t code; - uint8_t formatVersion = 0; + uint8_t fmtVer = 0; - // format version - code = tBufferPutU8(writer, formatVersion); - if (code) return code; - - // struct info - code = tBufferPutI8(writer, compressInfo->cmprAlg); - if (code) return code; - code = tBufferPutI8(writer, compressInfo->type); - if (code) return code; - code = tBufferPutI32v(writer, compressInfo->dataOriginalSize); - if (code) return code; - code = tBufferPutI32v(writer, compressInfo->dataCompressedSize); - if (code) return code; - code = tBufferPutI32v(writer, compressInfo->offsetOriginalSize); - if (code) return code; - code = tBufferPutI32v(writer, compressInfo->offsetCompressedSize); - if (code) return code; + if ((code = tBufferPutU8(buffer, fmtVer))) return code; + if ((code = tBufferPutI8(buffer, info->cmprAlg))) return code; + if ((code = tBufferPutI8(buffer, info->type))) return code; + if (IS_VAR_DATA_TYPE(info->type)) { + if ((code = tBufferPutI32v(buffer, info->offsetOriginalSize))) return code; + if ((code = tBufferPutI32v(buffer, info->offsetCompressedSize))) return code; + } + if ((code = tBufferPutI32v(buffer, info->dataOriginalSize))) return code; + if ((code = tBufferPutI32v(buffer, info->dataCompressedSize))) return code; return 0; } -int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo) { +int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *info) { int32_t code; - uint8_t formatVersion; + uint8_t fmtVer; - // format version - code = tBufferGetU8(reader, &formatVersion); - if (code) return code; - - if (formatVersion == 0) { - code = tBufferGetI8(reader, &compressInfo->cmprAlg); - if (code) return code; - code = tBufferGetI8(reader, &compressInfo->type); - if (code) return code; - code = tBufferGetI32v(reader, &compressInfo->dataOriginalSize); - if (code) return code; - code = tBufferGetI32v(reader, &compressInfo->dataCompressedSize); - if (code) return code; - code = tBufferGetI32v(reader, &compressInfo->offsetOriginalSize); - if (code) return code; - code = tBufferGetI32v(reader, &compressInfo->offsetCompressedSize); - if (code) return code; + if ((code = tBufferGetU8(reader, &fmtVer))) return code; + if (fmtVer == 0) { + if ((code = tBufferGetI8(reader, &info->cmprAlg))) return code; + if ((code = tBufferGetI8(reader, &info->type))) return code; + if (IS_VAR_DATA_TYPE(info->type)) { + if ((code = tBufferGetI32v(reader, &info->offsetOriginalSize))) return code; + if ((code = tBufferGetI32v(reader, &info->offsetCompressedSize))) return code; + } else { + info->offsetOriginalSize = 0; + info->offsetCompressedSize = 0; + } + if ((code = tBufferGetI32v(reader, &info->dataOriginalSize))) return code; + if ((code = tBufferGetI32v(reader, &info->dataCompressedSize))) return code; } else { - return TSDB_CODE_INVALID_DATA_FMT; + ASSERT(0); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 569570e75a..a5a2b1eaca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -19,7 +19,8 @@ struct SDataFileReader { SDataFileReaderConfig config[1]; - uint8_t *bufArr[5]; + SBuffer local[5]; + SBuffer *buffers; struct { bool headFooterLoaded; @@ -89,9 +90,14 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig TSDB_CHECK_CODE(code, lino, _exit); } + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); i++) { + tBufferInit(reader[0]->local + i); + } + reader[0]->config[0] = config[0]; - if (reader[0]->config->bufArr == NULL) { - reader[0]->config->bufArr = reader[0]->bufArr; + reader[0]->buffers = config->buffers; + if (reader[0]->buffers == NULL) { + reader[0]->buffers = reader[0]->local; } if (fname) { @@ -125,19 +131,14 @@ int32_t tsdbDataFileReaderClose(SDataFileReader **reader) { TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); TARRAY2_DESTROY(reader[0]->brinBlkArray, NULL); -#if 0 - TARRAY2_DESTROY(reader[0]->dataBlkArray, NULL); - TARRAY2_DESTROY(reader[0]->blockIdxArray, NULL); -#endif - for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (reader[0]->fd[i]) { tsdbCloseFile(&reader[0]->fd[i]); } } - for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) { - tFree(reader[0]->bufArr[i]); + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) { + tBufferDestroy(reader[0]->local + i); } taosMemoryFree(reader[0]); @@ -188,38 +189,81 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size); + // load data + tBufferClear(&reader->buffers[0]); + code = tBufferEnsureCapacity(&reader->buffers[0], brinBlk->dp->size); TSDB_CHECK_CODE(code, lino, _exit); - - code = - tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0); + code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->buffers[0].data, brinBlk->dp->size, 0); TSDB_CHECK_CODE(code, lino, _exit); + reader->buffers[0].size = brinBlk->dp->size; -#if 0 - int32_t size = 0; + // decode brin block + SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); tBrinBlockClear(brinBlock); - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { - code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[i], TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, - &reader->config->bufArr[1], brinBlk->numRec * sizeof(int64_t), &reader->config->bufArr[2]); + brinBlock->numOfPKs = brinBlk->numOfPKs; + brinBlock->numOfRecords = brinBlk->numRec; + for (int32_t i = 0; i < 10; i++) { + SCompressInfo cinfo = { + .cmprAlg = brinBlk->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .compressedSize = brinBlk->size[i], + .originalSize = brinBlk->numRec * sizeof(int64_t), + }; + code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo, + brinBlock->buffers + i, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr1[i], reader->config->bufArr[1], brinBlk->numRec); - TSDB_CHECK_CODE(code, lino, _exit); - - size += brinBlk->size[i]; + br.offset += brinBlk->size[i]; } - for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) { - code = tsdbDecmprData(reader->config->bufArr[0] + size, brinBlk->size[j], TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, - &reader->config->bufArr[1], brinBlk->numRec * sizeof(int32_t), &reader->config->bufArr[2]); + for (int32_t i = 10; i < 15; i++) { + SCompressInfo cinfo = { + .cmprAlg = brinBlk->cmprAlg, + .dataType = TSDB_DATA_TYPE_INT, + .compressedSize = brinBlk->size[i], + .originalSize = brinBlk->numRec * sizeof(int32_t), + }; + code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo, + brinBlock->buffers + i, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND_BATCH(&brinBlock->dataArr2[i], reader->config->bufArr[1], brinBlk->numRec); - TSDB_CHECK_CODE(code, lino, _exit); - - size += brinBlk->size[j]; + br.offset += brinBlk->size[i]; } -#endif + + // primary keys + if (brinBlk->numOfPKs > 0) { // decode the primary keys + SValueColumnCompressInfo firstInfos[TD_MAX_PK_COLS]; + SValueColumnCompressInfo lastInfos[TD_MAX_PK_COLS]; + + for (int32_t i = 0; i < brinBlk->numOfPKs; i++) { + code = tValueColumnCompressInfoDecode(&br, firstInfos + i); + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t i = 0; i < brinBlk->numOfPKs; i++) { + code = tValueColumnCompressInfoDecode(&br, lastInfos + i); + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < brinBlk->numOfPKs; i++) { + SValueColumnCompressInfo *info = firstInfos + i; + int32_t totalCompressedSize = info->offsetCompressedSize + info->dataCompressedSize; + + code = tValueColumnDecompress(tBufferGetDataAt(br.buffer, br.offset), totalCompressedSize, info, + brinBlock->firstKeyPKs + i, reader->buffers + 1); + TSDB_CHECK_CODE(code, lino, _exit); + br.offset += totalCompressedSize; + } + + for (int32_t i = 0; i < brinBlk->numOfPKs; i++) { + SValueColumnCompressInfo *info = lastInfos + i; + int32_t totalCompressedSize = info->offsetCompressedSize + info->dataCompressedSize; + + code = tValueColumnDecompress(tBufferGetDataAt(br.buffer, br.offset), totalCompressedSize, info, + brinBlock->lastKeyPKs + i, reader->buffers + 1); + TSDB_CHECK_CODE(code, lino, _exit); + br.offset += totalCompressedSize; + } + } + + ASSERT(br.offset == br.buffer->size); _exit: if (code) { @@ -232,13 +276,15 @@ int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *re int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->config->bufArr[0], record->blockSize); + // load data + tBufferClear(&reader->buffers[0]); + code = tBufferEnsureCapacity(&reader->buffers[0], record->blockSize); TSDB_CHECK_CODE(code, lino, _exit); - - code = - tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize, 0); + code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->buffers[0].data, record->blockSize, 0); TSDB_CHECK_CODE(code, lino, _exit); + reader->buffers[0].size = record->blockSize; + // decompress code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); @@ -611,7 +657,7 @@ struct SDataFileWriter { SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - uint8_t *bufArr[5]; + SBuffer local[5]; SBuffer *buffers; struct { @@ -673,8 +719,8 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { tBlockDataDestroy(writer->ctx->blockData); tBrinBlockDestroy(writer->ctx->brinBlock); - for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) { - tFree(writer->bufArr[i]); + for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) { + tBufferDestroy(writer->local + i); } tDestroyTSchema(writer->skmRow->pTSchema); @@ -691,7 +737,7 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) { SDataFileReaderConfig config[1] = {{ .tsdb = writer->config->tsdb, .szPage = writer->config->szPage, - .bufArr = writer->config->bufArr, + .buffers = writer->buffers, }}; for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { @@ -721,7 +767,10 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; - if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr; + writer->buffers = writer->config->buffers; + if (writer->buffers == NULL) { + writer->buffers = writer->local; + } // open reader code = tsdbDataFileWriterDoOpenReader(writer); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index ee49677202..c2b9aa5260 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -29,12 +29,12 @@ typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray; typedef struct { SFDataPtr brinBlkPtr[1]; - SFDataPtr rsrvd[2]; + char rsrvd[32]; } SHeadFooter; typedef struct { SFDataPtr tombBlkPtr[1]; - SFDataPtr rsrvd[2]; + char rsrvd[32]; } STombFooter; // SDataFileReader ============================================= @@ -46,7 +46,7 @@ typedef struct SDataFileReaderConfig { bool exist; STFile file; } files[TSDB_FTYPE_MAX]; - uint8_t **bufArr; + SBuffer *buffers; } SDataFileReaderConfig; int32_t tsdbDataFileReaderOpen(const char *fname[/* TSDB_FTYPE_MAX */], const SDataFileReaderConfig *config, @@ -83,7 +83,7 @@ typedef struct SDataFileWriterConfig { } files[TSDB_FTYPE_MAX]; SSkmInfo *skmTb; SSkmInfo *skmRow; - uint8_t **bufArr; + SBuffer *buffers; } SDataFileWriterConfig; int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 81a136a0e3..952668bc95 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -29,17 +29,17 @@ struct STsdbSnapReader { int64_t ever; int8_t type; - uint8_t* aBuf[5]; + SBuffer buffers[5]; SSkmInfo skmTb[1]; TFileSetRangeArray* fsrArr; // context struct { - int32_t fsrArrIdx; + int32_t fsrArrIdx; STFileSetRange* fsr; - bool isDataDone; - bool isTombDone; + bool isDataDone; + bool isTombDone; } ctx[1]; // reader @@ -68,7 +68,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) { SDataFileReaderConfig config = { .tsdb = reader->tsdb, .szPage = reader->tsdb->pVnode->config.tsdbPageSize, - .bufArr = reader->aBuf, + .buffers = reader->buffers, }; bool hasDataFile = false; for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { @@ -1061,7 +1061,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang writer[0]->compactVersion = INT64_MAX; writer[0]->now = taosGetTimestampMs(); - code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr); + code = + tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 3464bb08a6..4c9a507986 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -503,22 +503,20 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta if (statisBlk->numOfPKs > 0) { SValueColumnCompressInfo firstKeyInfos[TD_MAX_PK_COLS]; SValueColumnCompressInfo lastKeyInfos[TD_MAX_PK_COLS]; - SBufferReader bfReader; - - tBufferReaderInit(&bfReader, true, size, &reader->buffers[0]); + SBufferReader br = BUFFER_READER_INITIALIZER(size, &reader->buffers[0]); // decode compress info for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { - code = tValueColumnCompressInfoDecode(&bfReader, &firstKeyInfos[i]); + code = tValueColumnCompressInfoDecode(&br, &firstKeyInfos[i]); TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { - code = tValueColumnCompressInfoDecode(&bfReader, &lastKeyInfos[i]); + code = tValueColumnCompressInfoDecode(&br, &lastKeyInfos[i]); TSDB_CHECK_CODE(code, lino, _exit); } - size = bfReader.offset; + size = br.offset; // decode value columns for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { @@ -686,19 +684,17 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { // compress primary keys if (statisBlk.numOfPKs > 0) { - SBufferWriter bfWriter; SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg}; tBufferClear(&writer->buffers[0]); tBufferClear(&writer->buffers[1]); - tBufferWriterInit(&bfWriter, true, 0, &writer->buffers[0]); for (int32_t i = 0; i < statisBlk.numOfPKs; i++) { code = tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); TSDB_CHECK_CODE(code, lino, _exit); - code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter); + code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]); TSDB_CHECK_CODE(code, lino, _exit); } @@ -706,7 +702,7 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); TSDB_CHECK_CODE(code, lino, _exit); - code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter); + code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 9922d9e6b2..c9c98070c0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1591,6 +1591,20 @@ _exit: return code; } +typedef struct SBlockDataCmprInfo { + // TODO +} SBlockDataCmprInfo; + +int32_t tBlockDataCompress(SBlockData *blockData, SBlockDataCmprInfo *info, SBuffer *buffer, SBuffer *assist) { + // TODO + return 0; +} + +int32_t tBlockDataDecompress(SBufferReader *reader, const SBlockDataCmprInfo *info, SBlockData *blockData) { + // TODO + return 0; +} + // SDiskDataHdr ============================== int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) { int32_t n = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 4bceee323f..6ea8844320 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -114,15 +114,19 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs); ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs); - code = tBufferAppend(&statisBlock->suids, &record->suid, sizeof(record->suid)); + code = tBufferPutI64(&statisBlock->suids, record->suid); if (code) return code; - code = tBufferAppend(&statisBlock->uids, &record->uid, sizeof(record->uid)); + + code = tBufferPutI64(&statisBlock->uids, record->uid); if (code) return code; - code = tBufferAppend(&statisBlock->firstKeyTimestamps, &record->firstKey.ts, sizeof(record->firstKey.ts)); + + code = tBufferPutI64(&statisBlock->firstKeyTimestamps, record->firstKey.ts); if (code) return code; - code = tBufferAppend(&statisBlock->lastKeyTimestamps, &record->lastKey.ts, sizeof(record->lastKey.ts)); + + code = tBufferPutI64(&statisBlock->lastKeyTimestamps, record->lastKey.ts); if (code) return code; - code = tBufferAppend(&statisBlock->counts, &record->count, sizeof(record->count)); + + code = tBufferPutI64(&statisBlock->counts, record->count); if (code) return code; for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) { @@ -137,21 +141,31 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco } int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) { - int32_t code; + int32_t code; + SBufferReader reader; if (idx < 0 || idx >= statisBlock->numOfRecords) { return TSDB_CODE_OUT_OF_RANGE; } - code = tBufferGet(&statisBlock->suids, idx, sizeof(record->suid), &record->suid); + reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->suid), &statisBlock->suids); + code = tBufferGetI64(&reader, &record->suid); if (code) return code; - code = tBufferGet(&statisBlock->uids, idx, sizeof(record->uid), &record->uid); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->uid), &statisBlock->uids); + code = tBufferGetI64(&reader, &record->uid); if (code) return code; - code = tBufferGet(&statisBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.ts), &record->firstKey.ts); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->firstKey.ts), &statisBlock->firstKeyTimestamps); + code = tBufferGetI64(&reader, &record->firstKey.ts); if (code) return code; - code = tBufferGet(&statisBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.ts), &record->lastKey.ts); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->lastKey.ts), &statisBlock->lastKeyTimestamps); + code = tBufferGetI64(&reader, &record->lastKey.ts); if (code) return code; - code = tBufferGet(&statisBlock->counts, idx, sizeof(record->count), &record->count); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->count), &statisBlock->counts); + code = tBufferGetI64(&reader, &record->count); if (code) return code; record->firstKey.numOfPKs = statisBlock->numOfPKs; @@ -216,43 +230,60 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { } ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs); - code = tBufferAppend(&brinBlock->suids, &record->suid, sizeof(record->suid)); + + code = tBufferPutI64(&brinBlock->suids, record->suid); if (code) return code; - code = tBufferAppend(&brinBlock->uids, &record->uid, sizeof(record->uid)); + + code = tBufferPutI64(&brinBlock->uids, record->uid); if (code) return code; - code = tBufferAppend(&brinBlock->firstKeyTimestamps, &record->firstKey.key.ts, sizeof(record->firstKey.key.ts)); + + code = tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts); if (code) return code; - code = tBufferAppend(&brinBlock->firstKeyVersions, &record->firstKey.version, sizeof(record->firstKey.version)); + + code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version); if (code) return code; + for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) { code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]); if (code) return code; } - code = tBufferAppend(&brinBlock->lastKeyTimestamps, &record->lastKey.key.ts, sizeof(record->lastKey.key.ts)); + + code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts); if (code) return code; - code = tBufferAppend(&brinBlock->lastKeyVersions, &record->lastKey.version, sizeof(record->lastKey.version)); + + code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version); if (code) return code; + for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) { code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]); if (code) return code; } - code = tBufferAppend(&brinBlock->minVers, &record->minVer, sizeof(record->minVer)); + + code = tBufferPutI64(&brinBlock->minVers, record->minVer); if (code) return code; - code = tBufferAppend(&brinBlock->maxVers, &record->maxVer, sizeof(record->maxVer)); + + code = tBufferPutI64(&brinBlock->maxVers, record->maxVer); if (code) return code; - code = tBufferAppend(&brinBlock->blockOffsets, &record->blockOffset, sizeof(record->blockOffset)); + + code = tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset); if (code) return code; - code = tBufferAppend(&brinBlock->smaOffsets, &record->smaOffset, sizeof(record->smaOffset)); + + code = tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset); if (code) return code; - code = tBufferAppend(&brinBlock->blockSizes, &record->blockSize, sizeof(record->blockSize)); + + code = tBufferPutI32(&brinBlock->blockSizes, record->blockSize); if (code) return code; - code = tBufferAppend(&brinBlock->blockKeySizes, &record->blockKeySize, sizeof(record->blockKeySize)); + + code = tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize); if (code) return code; - code = tBufferAppend(&brinBlock->smaSizes, &record->smaSize, sizeof(record->smaSize)); + + code = tBufferPutI32(&brinBlock->smaSizes, record->smaSize); if (code) return code; - code = tBufferAppend(&brinBlock->numRows, &record->numRow, sizeof(record->numRow)); + + code = tBufferPutI32(&brinBlock->numRows, record->numRow); if (code) return code; - code = tBufferAppend(&brinBlock->counts, &record->count, sizeof(record->count)); + + code = tBufferPutI32(&brinBlock->counts, record->count); if (code) return code; brinBlock->numOfRecords++; @@ -261,139 +292,172 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { } int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) { - int32_t code; + int32_t code; + SBufferReader reader; if (idx < 0 || idx >= brinBlock->numOfRecords) { return TSDB_CODE_OUT_OF_RANGE; } - code = tBufferGet(&brinBlock->suids, idx, sizeof(record->suid), &record->suid); + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->suids); + code = tBufferGetI64(&reader, &record->suid); if (code) return code; - code = tBufferGet(&brinBlock->uids, idx, sizeof(record->uid), &record->uid); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->uids); + code = tBufferGetI64(&reader, &record->uid); if (code) return code; - code = tBufferGet(&brinBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.key.ts), &record->firstKey.key.ts); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyTimestamps); + code = tBufferGetI64(&reader, &record->firstKey.key.ts); if (code) return code; - code = tBufferGet(&brinBlock->firstKeyVersions, idx, sizeof(record->firstKey.version), &record->firstKey.version); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyVersions); + code = tBufferGetI64(&reader, &record->firstKey.version); if (code) return code; + for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs; record->firstKey.key.numOfPKs++) { code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx, &record->firstKey.key.pks[record->firstKey.key.numOfPKs]); if (code) return code; } - code = tBufferGet(&brinBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.key.ts), &record->lastKey.key.ts); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyTimestamps); + code = tBufferGetI64(&reader, &record->lastKey.key.ts); if (code) return code; - code = tBufferGet(&brinBlock->lastKeyVersions, idx, sizeof(record->lastKey.version), &record->lastKey.version); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyVersions); + code = tBufferGetI64(&reader, &record->lastKey.version); if (code) return code; + for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs; record->lastKey.key.numOfPKs++) { code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx, &record->lastKey.key.pks[record->lastKey.key.numOfPKs]); if (code) return code; } - code = tBufferGet(&brinBlock->minVers, idx, sizeof(record->minVer), &record->minVer); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->minVers); + code = tBufferGetI64(&reader, &record->minVer); if (code) return code; - code = tBufferGet(&brinBlock->maxVers, idx, sizeof(record->maxVer), &record->maxVer); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->maxVers); + code = tBufferGetI64(&reader, &record->maxVer); if (code) return code; - code = tBufferGet(&brinBlock->blockOffsets, idx, sizeof(record->blockOffset), &record->blockOffset); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->blockOffsets); + code = tBufferGetI64(&reader, &record->blockOffset); if (code) return code; - code = tBufferGet(&brinBlock->smaOffsets, idx, sizeof(record->smaOffset), &record->smaOffset); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->smaOffsets); + code = tBufferGetI64(&reader, &record->smaOffset); if (code) return code; - code = tBufferGet(&brinBlock->blockSizes, idx, sizeof(record->blockSize), &record->blockSize); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockSizes); + code = tBufferGetI32(&reader, &record->blockSize); if (code) return code; - code = tBufferGet(&brinBlock->blockKeySizes, idx, sizeof(record->blockKeySize), &record->blockKeySize); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockKeySizes); + code = tBufferGetI32(&reader, &record->blockKeySize); if (code) return code; - code = tBufferGet(&brinBlock->smaSizes, idx, sizeof(record->smaSize), &record->smaSize); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->smaSizes); + code = tBufferGetI32(&reader, &record->smaSize); if (code) return code; - code = tBufferGet(&brinBlock->numRows, idx, sizeof(record->numRow), &record->numRow); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->numRows); + code = tBufferGetI32(&reader, &record->numRow); if (code) return code; - code = tBufferGet(&brinBlock->counts, idx, sizeof(record->count), &record->count); + + reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->counts); + code = tBufferGetI32(&reader, &record->count); if (code) return code; return 0; } -int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) { - int32_t code; - SBuffer *helperBuffer = NULL; // TODO +// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) { +// int32_t code; +// SBuffer *helperBuffer = NULL; // TODO - brinBlk->dp[0].size = 0; - brinBlk->numRec = brinBlock->numOfRecords; - brinBlk->numOfPKs = brinBlock->numOfPKs; +// brinBlk->dp[0].size = 0; +// brinBlk->numRec = brinBlock->numOfRecords; +// brinBlk->numOfPKs = brinBlock->numOfPKs; - // minTbid - code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid); - if (code) return code; - code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid); - if (code) return code; - // maxTbid - code = - tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), &brinBlk->maxTbid.suid); - if (code) return code; - code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), &brinBlk->maxTbid.uid); - if (code) return code; - // minVer and maxVer - const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); - const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); - brinBlk->minVer = minVers[0]; - brinBlk->maxVer = maxVers[0]; - for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) { - if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i]; - if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i]; - } +// // minTbid +// code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid); +// if (code) return code; +// code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid); +// if (code) return code; +// // maxTbid +// code = +// tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), +// &brinBlk->maxTbid.suid); +// if (code) return code; +// code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), +// &brinBlk->maxTbid.uid); if (code) return code; +// // minVer and maxVer +// const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); +// const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); +// brinBlk->minVer = minVers[0]; +// brinBlk->maxVer = maxVers[0]; +// for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) { +// if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i]; +// if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i]; +// } - // compress data - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { - SBuffer *bf = &brinBlock->buffers[i]; - SCompressInfo info = { - .cmprAlg = brinBlk->cmprAlg, - }; +// // compress data +// for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { +// SBuffer *bf = &brinBlock->buffers[i]; +// SCompressInfo info = { +// .cmprAlg = brinBlk->cmprAlg, +// }; - if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { - info.dataType = TSDB_DATA_TYPE_BIGINT; - } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { - info.dataType = TSDB_DATA_TYPE_INT; - } else { - ASSERT(0); - } +// if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { +// info.dataType = TSDB_DATA_TYPE_BIGINT; +// } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { +// info.dataType = TSDB_DATA_TYPE_INT; +// } else { +// ASSERT(0); +// } - code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); - if (code) return code; - brinBlk->size[i] = info.compressedSize; - brinBlk->dp[0].size += info.compressedSize; - } +// code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); +// if (code) return code; +// brinBlk->size[i] = info.compressedSize; +// brinBlk->dp[0].size += info.compressedSize; +// } - // encode primary keys - SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS]; - SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS]; +// // encode primary keys +// SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS]; +// SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS]; - for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { - SValueColumn *vc = &brinBlock->firstKeyPKs[i]; - firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; - code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer); - if (code) return code; - } +// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { +// SValueColumn *vc = &brinBlock->firstKeyPKs[i]; +// firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; +// code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer); +// if (code) return code; +// } - for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { - SValueColumn *vc = &brinBlock->lastKeyPKs[i]; - lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; - code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer); - if (code) return code; - } +// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { +// SValueColumn *vc = &brinBlock->lastKeyPKs[i]; +// lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; +// code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer); +// if (code) return code; +// } - return 0; -} +// return 0; +// } -int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) { - // if (brinBlk->fmtVersion == 0) { - // return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock); - // } else if (brinBlk->fmtVersion == 1) { - // return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock); - // } else { - // ASSERT(0); - // } - return 0; -} +// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) { +// if (brinBlk->fmtVersion == 0) { +// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock); +// } else if (brinBlk->fmtVersion == 1) { +// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock); +// } else { +// ASSERT(0); +// } +// return 0; +// } // other apis ---------- int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 51ccbb59a1..42b62dbdd2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -84,11 +84,11 @@ typedef struct { union { SBuffer buffers[5]; struct { - SBuffer suids; - SBuffer uids; - SBuffer firstKeyTimestamps; - SBuffer lastKeyTimestamps; - SBuffer counts; + SBuffer suids; // int64_t + SBuffer uids; // int64_t + SBuffer firstKeyTimestamps; // int64_t + SBuffer lastKeyTimestamps; // int64_t + SBuffer counts; // int64_t }; }; SValueColumn firstKeyPKs[TD_MAX_PK_COLS]; @@ -180,8 +180,8 @@ int32_t tBrinBlockDestroy(SBrinBlock *brinBlock); int32_t tBrinBlockClear(SBrinBlock *brinBlock); int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record); int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record); -int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer); -int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock); +// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer); +// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock); // other apis int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb); diff --git a/source/util/test/bufferTest.cpp b/source/util/test/bufferTest.cpp index 310f1ffb68..400c6b3d4b 100644 --- a/source/util/test/bufferTest.cpp +++ b/source/util/test/bufferTest.cpp @@ -28,74 +28,72 @@ TEST(BufferTest, forwardWriteAndRead) { taosSeedRand(taosGetTimestampSec()); // write - SBufferWriter writer = BUFFER_WRITER_INITIALIZER(forward, tBufferGetSize(&buffer), &buffer); - /* fix-len struct */ STestStruct testStruct = {1, 2}; - GTEST_ASSERT_EQ(tBufferPutFixed(&writer, &testStruct, sizeof(STestStruct)), 0); + GTEST_ASSERT_EQ(tBufferPut(&buffer, &testStruct, sizeof(STestStruct)), 0); /* int8_t */ int8_t i8 = taosRand() % UINT8_MAX - INT8_MAX; - GTEST_ASSERT_EQ(tBufferPutI8(&writer, i8), 0); + GTEST_ASSERT_EQ(tBufferPutI8(&buffer, i8), 0); /* int16_t */ int8_t i16 = taosRand() % UINT16_MAX - INT16_MAX; - GTEST_ASSERT_EQ(tBufferPutI16(&writer, i16), 0); + GTEST_ASSERT_EQ(tBufferPutI16(&buffer, i16), 0); /* int32_t */ int8_t i32 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutI32(&writer, i32), 0); + GTEST_ASSERT_EQ(tBufferPutI32(&buffer, i32), 0); /* int64_t */ int64_t i64 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutI64(&writer, i64), 0); + GTEST_ASSERT_EQ(tBufferPutI64(&buffer, i64), 0); /* uint8_t */ uint8_t u8 = taosRand() % UINT8_MAX; - GTEST_ASSERT_EQ(tBufferPutU8(&writer, u8), 0); + GTEST_ASSERT_EQ(tBufferPutU8(&buffer, u8), 0); /* uint16_t */ uint16_t u16 = taosRand() % UINT16_MAX; - GTEST_ASSERT_EQ(tBufferPutU16(&writer, u16), 0); + GTEST_ASSERT_EQ(tBufferPutU16(&buffer, u16), 0); /* uint32_t */ uint32_t u32 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutU32(&writer, u32), 0); + GTEST_ASSERT_EQ(tBufferPutU32(&buffer, u32), 0); /* uint64_t */ uint64_t u64 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutU64(&writer, u64), 0); + GTEST_ASSERT_EQ(tBufferPutU64(&buffer, u64), 0); /* float */ float f = (float)taosRand() / (float)taosRand(); - GTEST_ASSERT_EQ(tBufferPutF32(&writer, f), 0); + GTEST_ASSERT_EQ(tBufferPutF32(&buffer, f), 0); /* double */ double d = (double)taosRand() / (double)taosRand(); - GTEST_ASSERT_EQ(tBufferPutF64(&writer, d), 0); + GTEST_ASSERT_EQ(tBufferPutF64(&buffer, d), 0); /* binary */ uint8_t binary[10]; for (int32_t i = 0; i < sizeof(binary); ++i) { binary[i] = taosRand() % UINT8_MAX; } - GTEST_ASSERT_EQ(tBufferPutBinary(&writer, binary, sizeof(binary)), 0); + GTEST_ASSERT_EQ(tBufferPutBinary(&buffer, binary, sizeof(binary)), 0); /* cstr */ const char *cstr = "hello world"; - GTEST_ASSERT_EQ(tBufferPutCStr(&writer, cstr), 0); + GTEST_ASSERT_EQ(tBufferPutCStr(&buffer, cstr), 0); /* uint16v_t */ uint16_t u16v[] = {0, 127, 128, 129, 16384, 16385, 16386, UINT16_MAX}; for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU16v(&writer, u16v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutU16v(&buffer, u16v[i]), 0); } /* uint32v_t */ uint32_t u32v[] = {0, 127, 128, 129, 16384, 16385, 16386, (1 << 21) - 1, (1 << 21), (1 << 21) + 1, (1 << 28) - 1, (1 << 28), (1 << 28) + 1, UINT32_MAX}; for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU32v(&writer, u32v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutU32v(&buffer, u32v[i]), 0); } /* uint64v_t */ @@ -129,7 +127,7 @@ TEST(BufferTest, forwardWriteAndRead) { (1ul << (7 * 9)) + 1, UINT64_MAX}; for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU64v(&writer, u64v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutU64v(&buffer, u64v[i]), 0); } /* int16v_t */ @@ -153,7 +151,7 @@ TEST(BufferTest, forwardWriteAndRead) { INT16_MAX, }; for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI16v(&writer, i16v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutI16v(&buffer, i16v[i]), 0); } /* int32v_t */ @@ -189,7 +187,7 @@ TEST(BufferTest, forwardWriteAndRead) { INT32_MAX, }; for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI32v(&writer, i32v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutI32v(&buffer, i32v[i]), 0); } /* int64v_t */ @@ -248,17 +246,15 @@ TEST(BufferTest, forwardWriteAndRead) { INT64_MAX, }; for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI64v(&writer, i64v[i]), 0); + GTEST_ASSERT_EQ(tBufferPutI64v(&buffer, i64v[i]), 0); } - tBufferWriterDestroy(&writer); - // read - SBufferReader reader = BUFFER_READER_INITIALIZER(forward, 0, &buffer); + SBufferReader reader = BUFFER_READER_INITIALIZER(0, &buffer); /* fix-len struct */ STestStruct testStruct2 = {1, 2}; - GTEST_ASSERT_EQ(tBufferGetFixed(&reader, &testStruct2, sizeof(STestStruct)), 0); + GTEST_ASSERT_EQ(tBufferGet(&reader, sizeof(STestStruct), &testStruct2), 0); GTEST_ASSERT_EQ(testStruct.value1, testStruct2.value1); GTEST_ASSERT_EQ(testStruct.value2, testStruct2.value2); GTEST_ASSERT_EQ(testStruct.value3, testStruct2.value3); @@ -372,359 +368,3 @@ TEST(BufferTest, forwardWriteAndRead) { // clear tBufferDestroy(&buffer); } - -TEST(BufferTest, backwardWriteAndRead) { - int32_t code = 0; - bool forward = false; - SBuffer buffer; - - tBufferInit(&buffer); - taosSeedRand(taosGetTimestampSec()); - - // write - SBufferWriter writer = BUFFER_WRITER_INITIALIZER(forward, 4096, &buffer); - - /* fix-len struct */ - STestStruct testStruct = {1, 2}; - GTEST_ASSERT_EQ(tBufferPutFixed(&writer, &testStruct, sizeof(STestStruct)), 0); - - /* int8_t */ - int8_t i8 = taosRand() % UINT8_MAX - INT8_MAX; - GTEST_ASSERT_EQ(tBufferPutI8(&writer, i8), 0); - - /* int16_t */ - int8_t i16 = taosRand() % UINT16_MAX - INT16_MAX; - GTEST_ASSERT_EQ(tBufferPutI16(&writer, i16), 0); - - /* int32_t */ - int8_t i32 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutI32(&writer, i32), 0); - - /* int64_t */ - int64_t i64 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutI64(&writer, i64), 0); - - /* uint8_t */ - uint8_t u8 = taosRand() % UINT8_MAX; - GTEST_ASSERT_EQ(tBufferPutU8(&writer, u8), 0); - - /* uint16_t */ - uint16_t u16 = taosRand() % UINT16_MAX; - GTEST_ASSERT_EQ(tBufferPutU16(&writer, u16), 0); - - /* uint32_t */ - uint32_t u32 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutU32(&writer, u32), 0); - - /* uint64_t */ - uint64_t u64 = taosRand(); - GTEST_ASSERT_EQ(tBufferPutU64(&writer, u64), 0); - - /* float */ - float f = (float)taosRand() / (float)taosRand(); - GTEST_ASSERT_EQ(tBufferPutF32(&writer, f), 0); - - /* double */ - double d = (double)taosRand() / (double)taosRand(); - GTEST_ASSERT_EQ(tBufferPutF64(&writer, d), 0); - - /* binary */ - uint8_t binary[10]; - for (int32_t i = 0; i < sizeof(binary); ++i) { - binary[i] = taosRand() % UINT8_MAX; - } - GTEST_ASSERT_EQ(tBufferPutBinary(&writer, binary, sizeof(binary)), 0); - - /* cstr */ - const char *cstr = "hello world"; - GTEST_ASSERT_EQ(tBufferPutCStr(&writer, cstr), 0); - - /* uint16v_t */ - uint16_t u16v[] = {0, 127, 128, 129, 16384, 16385, 16386, UINT16_MAX}; - for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU16v(&writer, u16v[i]), 0); - } - - /* uint32v_t */ - uint32_t u32v[] = {0, 127, 128, 129, 16384, 16385, 16386, (1 << 21) - 1, - (1 << 21), (1 << 21) + 1, (1 << 28) - 1, (1 << 28), (1 << 28) + 1, UINT32_MAX}; - for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU32v(&writer, u32v[i]), 0); - } - - /* uint64v_t */ - uint64_t u64v[] = {0, // 0 - (1ul << (7 * 1)) - 1, - (1ul << (7 * 1)), - (1ul << (7 * 1)) + 1, - (1ul << (7 * 2)) - 1, - (1ul << (7 * 2)), - (1ul << (7 * 2)) + 1, - (1ul << (7 * 3)) - 1, - (1ul << (7 * 3)), - (1ul << (7 * 3)) + 1, - (1ul << (7 * 4)) - 1, - (1ul << (7 * 4)), - (1ul << (7 * 4)) + 1, - (1ul << (7 * 5)) - 1, - (1ul << (7 * 5)), - (1ul << (7 * 5)) + 1, - (1ul << (7 * 6)) - 1, - (1ul << (7 * 6)), - (1ul << (7 * 6)) + 1, - (1ul << (7 * 7)) - 1, - (1ul << (7 * 7)), - (1ul << (7 * 7)) + 1, - (1ul << (7 * 8)) - 1, - (1ul << (7 * 8)), - (1ul << (7 * 8)) + 1, - (1ul << (7 * 9)) - 1, - (1ul << (7 * 9)), - (1ul << (7 * 9)) + 1, - UINT64_MAX}; - for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutU64v(&writer, u64v[i]), 0); - } - - /* int16v_t */ - int16_t i16v[] = { - INT16_MIN, // - -((1 << (7 * 1)) - 1), - -((1 << (7 * 1))), - -((1 << (7 * 1)) + 1), - -((1 << (7 * 2)) - 1), - -((1 << (7 * 2))), - -((1 << (7 * 2)) + 1), - (1 << (7 * 0)) - 1, - (1 << (7 * 0)), - (1 << (7 * 0)) + 1, - (1 << (7 * 1)) - 1, - (1 << (7 * 1)), - (1 << (7 * 1)) + 1, - (1 << (7 * 2)) - 1, - (1 << (7 * 2)), - (1 << (7 * 2)) + 1, - INT16_MAX, - }; - for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI16v(&writer, i16v[i]), 0); - } - - /* int32v_t */ - int32_t i32v[] = { - INT32_MIN, // - -((1 << (7 * 1)) - 1), - -((1 << (7 * 1))), - -((1 << (7 * 1)) + 1), - -((1 << (7 * 2)) - 1), - -((1 << (7 * 2))), - -((1 << (7 * 2)) + 1), - -((1 << (7 * 3)) - 1), - -((1 << (7 * 3))), - -((1 << (7 * 3)) + 1), - -((1 << (7 * 4)) - 1), - -((1 << (7 * 4))), - -((1 << (7 * 4)) + 1), - (1 << (7 * 0)) - 1, - (1 << (7 * 0)), - (1 << (7 * 0)) + 1, - (1 << (7 * 1)) - 1, - (1 << (7 * 1)), - (1 << (7 * 1)) + 1, - (1 << (7 * 2)) - 1, - (1 << (7 * 2)), - (1 << (7 * 2)) + 1, - (1 << (7 * 3)) - 1, - (1 << (7 * 3)), - (1 << (7 * 3)) + 1, - (1 << (7 * 4)) - 1, - (1 << (7 * 4)), - (1 << (7 * 4)) + 1, - INT32_MAX, - }; - for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI32v(&writer, i32v[i]), 0); - } - - /* int64v_t */ - int64_t i64v[] = { - INT64_MIN, // - -((1l << (7 * 1)) - 1), - -((1l << (7 * 1))), - -((1l << (7 * 1)) + 1), - -((1l << (7 * 2)) - 1), - -((1l << (7 * 2))), - -((1l << (7 * 2)) + 1), - -((1l << (7 * 3)) - 1), - -((1l << (7 * 3))), - -((1l << (7 * 3)) + 1), - -((1l << (7 * 4)) - 1), - -((1l << (7 * 4))), - -((1l << (7 * 4)) + 1), - -((1l << (7 * 5)) - 1), - -((1l << (7 * 5))), - -((1l << (7 * 5)) + 1), - -((1l << (7 * 6)) - 1), - -((1l << (7 * 6))), - -((1l << (7 * 6)) + 1), - -((1l << (7 * 7)) - 1), - -((1l << (7 * 7))), - -((1l << (7 * 7)) + 1), - -((1l << (7 * 8)) - 1), - -((1l << (7 * 8))), - -((1l << (7 * 8)) + 1), - -((1l << (7 * 9)) + 1), - ((1l << (7 * 1)) - 1), - ((1l << (7 * 1))), - ((1l << (7 * 1)) + 1), - ((1l << (7 * 2)) - 1), - ((1l << (7 * 2))), - ((1l << (7 * 2)) + 1), - ((1l << (7 * 3)) - 1), - ((1l << (7 * 3))), - ((1l << (7 * 3)) + 1), - ((1l << (7 * 4)) - 1), - ((1l << (7 * 4))), - ((1l << (7 * 4)) + 1), - ((1l << (7 * 5)) - 1), - ((1l << (7 * 5))), - ((1l << (7 * 5)) + 1), - ((1l << (7 * 6)) - 1), - ((1l << (7 * 6))), - ((1l << (7 * 6)) + 1), - ((1l << (7 * 7)) - 1), - ((1l << (7 * 7))), - ((1l << (7 * 7)) + 1), - ((1l << (7 * 8)) - 1), - ((1l << (7 * 8))), - ((1l << (7 * 8)) + 1), - ((1l << (7 * 9)) + 1), - INT64_MAX, - }; - for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferPutI64v(&writer, i64v[i]), 0); - } - - tBufferWriterDestroy(&writer); - - // read - SBufferReader reader = BUFFER_READER_INITIALIZER(forward, 4096, &buffer); - - /* fix-len struct */ - STestStruct testStruct2 = {1, 2}; - GTEST_ASSERT_EQ(tBufferGetFixed(&reader, &testStruct2, sizeof(STestStruct)), 0); - GTEST_ASSERT_EQ(testStruct.value1, testStruct2.value1); - GTEST_ASSERT_EQ(testStruct.value2, testStruct2.value2); - GTEST_ASSERT_EQ(testStruct.value3, testStruct2.value3); - - /* int8_t */ - int8_t i8_2 = 97; - GTEST_ASSERT_EQ(tBufferGetI8(&reader, &i8_2), 0); - GTEST_ASSERT_EQ(i8, i8_2); - - /* int16_t */ - int16_t i16_2; - GTEST_ASSERT_EQ(tBufferGetI16(&reader, &i16_2), 0); - GTEST_ASSERT_EQ(i16, i16_2); - - /* int32_t */ - int32_t i32_2; - GTEST_ASSERT_EQ(tBufferGetI32(&reader, &i32_2), 0); - GTEST_ASSERT_EQ(i32, i32_2); - - /* int64_t */ - int64_t i64_2; - GTEST_ASSERT_EQ(tBufferGetI64(&reader, &i64_2), 0); - GTEST_ASSERT_EQ(i64, i64_2); - - /* uint8_t */ - uint8_t u8_2; - GTEST_ASSERT_EQ(tBufferGetU8(&reader, &u8_2), 0); - GTEST_ASSERT_EQ(u8, u8_2); - - /* uint16_t */ - uint16_t u16_2; - GTEST_ASSERT_EQ(tBufferGetU16(&reader, &u16_2), 0); - GTEST_ASSERT_EQ(u16, u16_2); - - /* uint32_t */ - uint32_t u32_2; - GTEST_ASSERT_EQ(tBufferGetU32(&reader, &u32_2), 0); - GTEST_ASSERT_EQ(u32, u32_2); - - /* uint64_t */ - uint64_t u64_2; - GTEST_ASSERT_EQ(tBufferGetU64(&reader, &u64_2), 0); - GTEST_ASSERT_EQ(u64, u64_2); - - /* float */ - float f_2; - GTEST_ASSERT_EQ(tBufferGetF32(&reader, &f_2), 0); - GTEST_ASSERT_EQ(f, f_2); - - /* double */ - double d_2; - GTEST_ASSERT_EQ(tBufferGetF64(&reader, &d_2), 0); - GTEST_ASSERT_EQ(d, d_2); - - /* binary */ - const void *binary2; - uint32_t binarySize; - GTEST_ASSERT_EQ(tBufferGetBinary(&reader, &binary2, &binarySize), 0); - GTEST_ASSERT_EQ(memcmp(binary, binary2, sizeof(binary)), 0); - GTEST_ASSERT_EQ(binarySize, sizeof(binary)); - - /* cstr */ - const char *cstr2; - GTEST_ASSERT_EQ(tBufferGetCStr(&reader, &cstr2), 0); - GTEST_ASSERT_EQ(strcmp(cstr, cstr2), 0); - - /* uint16v_t */ - uint16_t u16v2[sizeof(u16v) / sizeof(u16v[0])]; - for (int32_t i = 0; i < sizeof(u16v) / sizeof(u16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetU16v(&reader, &u16v2[i]), 0); - GTEST_ASSERT_EQ(u16v[i], u16v2[i]); - } - - /* uint32v_t */ - uint32_t u32v2[sizeof(u32v) / sizeof(u32v[0])]; - for (int32_t i = 0; i < sizeof(u32v) / sizeof(u32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetU32v(&reader, &u32v2[i]), 0); - GTEST_ASSERT_EQ(u32v[i], u32v2[i]); - } - - /* uint64v_t */ - uint64_t u64v2[sizeof(u64v) / sizeof(u64v[0])]; - for (int32_t i = 0; i < sizeof(u64v) / sizeof(u64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetU64v(&reader, &u64v2[i]), 0); - GTEST_ASSERT_EQ(u64v[i], u64v2[i]); - } - - /* int16v_t */ - int16_t i16v2[sizeof(i16v) / sizeof(i16v[0])]; - for (int32_t i = 0; i < sizeof(i16v) / sizeof(i16v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetI16v(&reader, &i16v2[i]), 0); - GTEST_ASSERT_EQ(i16v[i], i16v2[i]); - } - - /* int32v_t */ - int32_t i32v2[sizeof(i32v) / sizeof(i32v[0])]; - for (int32_t i = 0; i < sizeof(i32v) / sizeof(i32v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetI32v(&reader, &i32v2[i]), 0); - GTEST_ASSERT_EQ(i32v[i], i32v2[i]); - } - - /* int64v_t */ - int64_t i64v2[sizeof(i64v) / sizeof(i64v[0])]; - for (int32_t i = 0; i < sizeof(i64v) / sizeof(i64v[0]); ++i) { - GTEST_ASSERT_EQ(tBufferGetI64v(&reader, &i64v2[i]), 0); - GTEST_ASSERT_EQ(i64v[i], i64v2[i]); - } - - tBufferReaderDestroy(&reader); - - GTEST_ASSERT_EQ(reader.offset, writer.offset); - - // clear - tBufferDestroy(&buffer); -} \ No newline at end of file