From 8aa89525c8349778320a74eb48e49f370ece30c4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 29 May 2020 10:56:45 +0000 Subject: [PATCH 01/13] add signed value codig functions --- src/tsdb/src/tsdbFile.c | 2 +- src/tsdb/src/tsdbRWHelper.c | 54 ++++---- src/util/inc/tcoding.h | 232 +++++++++++++++++++++++---------- src/util/tests/codingTests.cpp | 24 ++-- 4 files changed, 203 insertions(+), 109 deletions(-) diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index bd5c20bd7a..e885d9a919 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -94,7 +94,7 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1; void *pBuf = buf; - pBuf = taosDecodeFixed32(pBuf, &version); + pBuf = taosDecodeFixedU32(pBuf, &version); pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); tsdbCloseFile(pFile); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index eebe0b6b46..38e6b8f2f6 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -443,7 +443,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { for (uint32_t i = 0; i < pHelper->config.maxTables; i++) { SCompIdx *pCompIdx = pHelper->pCompIdx + i; if (pCompIdx->offset > 0) { - buf = taosEncodeVariant32(buf, i); + buf = taosEncodeVariantU32(buf, i); buf = tsdbEncodeSCompIdx(buf, pCompIdx); } } @@ -480,7 +480,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { void *ptr = pHelper->pBuffer; while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { uint32_t tid = 0; - if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1; + if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1; ASSERT(tid > 0 && tid < pHelper->config.maxTables); if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; @@ -1242,12 +1242,12 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) } void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) { - buf = taosEncodeVariant32(buf, pIdx->len); - buf = taosEncodeVariant32(buf, pIdx->offset); - buf = taosEncodeFixed8(buf, pIdx->hasLast); - buf = taosEncodeVariant32(buf, pIdx->numOfBlocks); - buf = taosEncodeFixed64(buf, pIdx->uid); - buf = taosEncodeFixed64(buf, pIdx->maxKey); + buf = taosEncodeVariantU32(buf, pIdx->len); + buf = taosEncodeVariantU32(buf, pIdx->offset); + buf = taosEncodeFixedU8(buf, pIdx->hasLast); + buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks); + buf = taosEncodeFixedU64(buf, pIdx->uid); + buf = taosEncodeFixedU64(buf, pIdx->maxKey); return buf; } @@ -1257,15 +1257,15 @@ void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { uint32_t numOfBlocks = 0; uint64_t value = 0; - if ((buf = taosDecodeVariant32(buf, &(pIdx->len))) == NULL) return NULL; - if ((buf = taosDecodeVariant32(buf, &(pIdx->offset))) == NULL) return NULL; - if ((buf = taosDecodeFixed8(buf, &(hasLast))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; + if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; pIdx->hasLast = hasLast; - if ((buf = taosDecodeVariant32(buf, &(numOfBlocks))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; pIdx->numOfBlocks = numOfBlocks; - if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; pIdx->uid = (int64_t)value; - if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; pIdx->maxKey = (TSKEY)value; return buf; @@ -1275,7 +1275,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; void *pBuf = (void *)buf; - pBuf = taosEncodeFixed32(pBuf, version); + pBuf = taosEncodeFixedU32(pBuf, version); pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -1289,23 +1289,23 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) { - buf = taosEncodeFixed32(buf, pInfo->offset); - buf = taosEncodeFixed32(buf, pInfo->len); - buf = taosEncodeFixed64(buf, pInfo->size); - buf = taosEncodeFixed64(buf, pInfo->tombSize); - buf = taosEncodeFixed32(buf, pInfo->totalBlocks); - buf = taosEncodeFixed32(buf, pInfo->totalSubBlocks); + buf = taosEncodeFixedU32(buf, pInfo->offset); + buf = taosEncodeFixedU32(buf, pInfo->len); + buf = taosEncodeFixedU64(buf, pInfo->size); + buf = taosEncodeFixedU64(buf, pInfo->tombSize); + buf = taosEncodeFixedU32(buf, pInfo->totalBlocks); + buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks); return buf; } void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixed32(buf, &(pInfo->offset)); - buf = taosDecodeFixed32(buf, &(pInfo->len)); - buf = taosDecodeFixed64(buf, &(pInfo->size)); - buf = taosDecodeFixed64(buf, &(pInfo->tombSize)); - buf = taosDecodeFixed32(buf, &(pInfo->totalBlocks)); - buf = taosDecodeFixed32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->len)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); return buf; } \ No newline at end of file diff --git a/src/util/inc/tcoding.h b/src/util/inc/tcoding.h index cc9caf71d0..e22c959a56 100644 --- a/src/util/inc/tcoding.h +++ b/src/util/inc/tcoding.h @@ -29,12 +29,33 @@ extern "C" { static const int32_t TNUMBER = 1; #define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0) -static FORCE_INLINE void *taosEncodeFixed8(void *buf, uint8_t value) { +#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode +#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode + +// ---- Fixed U8 +static FORCE_INLINE void *taosEncodeFixedU8(void *buf, uint8_t value) { ((uint8_t *)buf)[0] = value; return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) { +static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) { + *value = ((uint8_t *)buf)[0]; + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I8 +static FORCE_INLINE void *taosEncodeFixedI8(void *buf, int8_t value) { + ((int8_t *)buf)[0] = value; + return POINTER_SHIFT(buf, sizeof(value)); +} + +static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) { + *value = ((int8_t *)buf)[0]; + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed U16 +static FORCE_INLINE void *taosEncodeFixedU16(void *buf, uint16_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -45,7 +66,31 @@ static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) { +static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) { + if (IS_LITTLE_ENDIAN()) { + memcpy(value, buf, sizeof(*value)); + } else { + ((uint8_t *)value)[1] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[1]; + } + + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I16 +static FORCE_INLINE void *taosEncodeFixedI16(void *buf, int16_t value) { + return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) { + uint16_t tvalue = 0; + void * ret = taosDecodeFixedU16(buf, &tvalue); + *value = ZIGZAGD(int16_t, tvalue); + return ret; +} + +// ---- Fixed U32 +static FORCE_INLINE void *taosEncodeFixedU32(void *buf, uint32_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -58,7 +103,33 @@ static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) { +static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) { + if (IS_LITTLE_ENDIAN()) { + memcpy(value, buf, sizeof(*value)); + } else { + ((uint8_t *)value)[3] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[2] = ((uint8_t *)buf)[1]; + ((uint8_t *)value)[1] = ((uint8_t *)buf)[2]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[3]; + } + + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I32 +static FORCE_INLINE void *taosEncodeFixedI32(void *buf, int32_t value) { + return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) { + uint32_t tvalue = 0; + void * ret = taosDecodeFixedU32(buf, &tvalue); + *value = ZIGZAGD(int32_t, tvalue); + return ret; +} + +// ---- Fixed U64 +static FORCE_INLINE void *taosEncodeFixedU64(void *buf, uint64_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -75,36 +146,7 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosDecodeFixed8(void *buf, uint8_t *value) { - *value = ((uint8_t *)buf)[0]; - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) { - if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); - } else { - ((uint8_t *)value)[1] = ((uint8_t *)buf)[0]; - ((uint8_t *)value)[0] = ((uint8_t *)buf)[1]; - } - - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed32(void *buf, uint32_t *value) { - if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); - } else { - ((uint8_t *)value)[3] = ((uint8_t *)buf)[0]; - ((uint8_t *)value)[2] = ((uint8_t *)buf)[1]; - ((uint8_t *)value)[1] = ((uint8_t *)buf)[2]; - ((uint8_t *)value)[0] = ((uint8_t *)buf)[3]; - } - - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) { +static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) { if (IS_LITTLE_ENDIAN()) { memcpy(value, buf, sizeof(*value)); } else { @@ -121,7 +163,20 @@ static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) { return POINTER_SHIFT(buf, sizeof(*value)); } -static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) { +// ---- Fixed I64 +static FORCE_INLINE void *taosEncodeFixedI64(void *buf, int64_t value) { + return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) { + uint64_t tvalue = 0; + void * ret = taosDecodeFixedU64(buf, &tvalue); + *value = ZIGZAGD(int64_t, tvalue); + return ret; +} + +// ---- Variant U16 +static FORCE_INLINE void *taosEncodeVariantU16(void *buf, uint16_t value) { int i = 0; while (value >= ENCODE_LIMIT) { ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); @@ -132,39 +187,11 @@ static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) { ((uint8_t *)buf)[i] = value; - return POINTER_SHIFT(buf, i+1); -} - -static FORCE_INLINE void *taosEncodeVariant32(void *buf, uint32_t value) { - int i = 0; - while (value >= ENCODE_LIMIT) { - ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); - value >>= 7; - i++; - ASSERT(i < 5); - } - - ((uint8_t *)buf)[i] = value; - return POINTER_SHIFT(buf, i + 1); } -static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) { - int i = 0; - while (value >= ENCODE_LIMIT) { - ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); - value >>= 7; - i++; - ASSERT(i < 10); - } - - ((uint8_t *)buf)[i] = value; - - return POINTER_SHIFT(buf, i + 1); -} - -static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) { - int i = 0; +static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) { + int i = 0; uint16_t tval = 0; *value = 0; while (i < 3) { @@ -181,8 +208,35 @@ static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) { return NULL; // error happened } -static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) { +// ---- Variant I16 +static FORCE_INLINE void *taosEncodeVariantI16(void *buf, int16_t value) { + return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) { + uint16_t tvalue = 0; + void * ret = taosDecodeVariantU16(buf, &tvalue); + *value = ZIGZAGD(int16_t, tvalue); + return ret; +} + +// ---- Variant U32 +static FORCE_INLINE void *taosEncodeVariantU32(void *buf, uint32_t value) { int i = 0; + while (value >= ENCODE_LIMIT) { + ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); + value >>= 7; + i++; + ASSERT(i < 5); + } + + ((uint8_t *)buf)[i] = value; + + return POINTER_SHIFT(buf, i + 1); +} + +static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) { + int i = 0; uint32_t tval = 0; *value = 0; while (i < 5) { @@ -199,8 +253,35 @@ static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) { return NULL; // error happened } -static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) { +// ---- Variant I32 +static FORCE_INLINE void *taosEncodeVariantI32(void *buf, int32_t value) { + return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) { + uint32_t tvalue = 0; + void * ret = taosDecodeVariantU32(buf, &tvalue); + *value = ZIGZAGD(int32_t, tvalue); + return ret; +} + +// ---- Variant U64 +static FORCE_INLINE void *taosEncodeVariantU64(void *buf, uint64_t value) { int i = 0; + while (value >= ENCODE_LIMIT) { + ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); + value >>= 7; + i++; + ASSERT(i < 10); + } + + ((uint8_t *)buf)[i] = value; + + return POINTER_SHIFT(buf, i + 1); +} + +static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) { + int i = 0; uint64_t tval = 0; *value = 0; while (i < 10) { @@ -217,10 +298,23 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) { return NULL; // error happened } +// ---- Variant I64 +static FORCE_INLINE void *taosEncodeVariantI64(void *buf, int64_t value) { + return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) { + uint64_t tvalue = 0; + void * ret = taosDecodeVariantU64(buf, &tvalue); + *value = ZIGZAGD(int64_t, tvalue); + return ret; +} + +// ---- string static FORCE_INLINE void *taosEncodeString(void *buf, char *value) { size_t size = strlen(value); - buf = taosEncodeVariant64(buf, size); + buf = taosEncodeVariantU64(buf, size); memcpy(buf, value, size); return POINTER_SHIFT(buf, size); @@ -229,7 +323,7 @@ static FORCE_INLINE void *taosEncodeString(void *buf, char *value) { static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { uint64_t size = 0; - buf = taosDecodeVariant64(buf, &size); + buf = taosDecodeVariantU64(buf, &size); *value = (char *)malloc(size + 1); if (*value == NULL) return NULL; memcpy(*value, buf, size); diff --git a/src/util/tests/codingTests.cpp b/src/util/tests/codingTests.cpp index a72c7ef291..036ed0bf83 100644 --- a/src/util/tests/codingTests.cpp +++ b/src/util/tests/codingTests.cpp @@ -9,8 +9,8 @@ static bool test_fixed_uint16(uint16_t value) { char buf[20] = "\0"; uint16_t value_check = 0; - void *ptr1 = taosEncodeFixed16(static_cast(buf), value); - void *ptr2 = taosDecodeFixed16(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU16(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU16(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -19,8 +19,8 @@ static bool test_fixed_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; - void *ptr1 = taosEncodeFixed32(static_cast(buf), value); - void *ptr2 = taosDecodeFixed32(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU32(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU32(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -29,8 +29,8 @@ static bool test_fixed_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; - void *ptr1 = taosEncodeFixed64(static_cast(buf), value); - void *ptr2 = taosDecodeFixed64(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU64(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU64(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -39,8 +39,8 @@ static bool test_variant_uint16(uint16_t value) { char buf[20] = "\0"; uint16_t value_check = 0; - void *ptr1 = taosEncodeVariant16(static_cast(buf), value); - void *ptr2 = taosDecodeVariant16(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU16(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU16(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -49,8 +49,8 @@ static bool test_variant_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; - void *ptr1 = taosEncodeVariant32(static_cast(buf), value); - void *ptr2 = taosDecodeVariant32(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU32(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU32(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -59,8 +59,8 @@ static bool test_variant_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; - void *ptr1 = taosEncodeVariant64(static_cast(buf), value); - void *ptr2 = taosDecodeVariant64(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU64(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU64(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } From cf1acba45b1cf3d3aec0adf507d8fb0ba2826373 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 29 May 2020 14:59:21 +0000 Subject: [PATCH 02/13] add more encoding tests --- src/util/tests/codingTests.cpp | 144 ++++++++++++++++++++++++++++++--- 1 file changed, 133 insertions(+), 11 deletions(-) diff --git a/src/util/tests/codingTests.cpp b/src/util/tests/codingTests.cpp index 036ed0bf83..57e21a828c 100644 --- a/src/util/tests/codingTests.cpp +++ b/src/util/tests/codingTests.cpp @@ -15,6 +15,16 @@ static bool test_fixed_uint16(uint16_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_fixed_int16(int16_t value) { + char buf[20] = "\0"; + int16_t value_check = 0; + + void *ptr1 = taosEncodeFixedI16(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI16(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + static bool test_fixed_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; @@ -25,6 +35,16 @@ static bool test_fixed_uint32(uint32_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_fixed_int32(int32_t value) { + char buf[20] = "\0"; + int32_t value_check = 0; + + void *ptr1 = taosEncodeFixedI32(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI32(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + static bool test_fixed_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; @@ -35,6 +55,16 @@ static bool test_fixed_uint64(uint64_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_fixed_int64(int64_t value) { + char buf[20] = "\0"; + int64_t value_check = 0; + + void *ptr1 = taosEncodeFixedI64(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI64(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + static bool test_variant_uint16(uint16_t value) { char buf[20] = "\0"; uint16_t value_check = 0; @@ -45,6 +75,16 @@ static bool test_variant_uint16(uint16_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_variant_int16(int16_t value) { + char buf[20] = "\0"; + int16_t value_check = 0; + + void *ptr1 = taosEncodeVariantI16(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI16(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + static bool test_variant_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; @@ -55,6 +95,16 @@ static bool test_variant_uint32(uint32_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_variant_int32(int32_t value) { + char buf[20] = "\0"; + int32_t value_check = 0; + + void *ptr1 = taosEncodeVariantI32(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI32(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + static bool test_variant_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; @@ -65,52 +115,124 @@ static bool test_variant_uint64(uint64_t value) { return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } +static bool test_variant_int64(int64_t value) { + char buf[20] = "\0"; + int64_t value_check = 0; + + void *ptr1 = taosEncodeVariantI64(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI64(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + TEST(codingTest, fixed_encode_decode) { srand(time(0)); + // uint16_t for (uint16_t value = 0; value <= UINT16_MAX; value++) { ASSERT_TRUE(test_fixed_uint16(value)); if (value == UINT16_MAX) break; } - ASSERT_TRUE(test_fixed_uint32(0)); - ASSERT_TRUE(test_fixed_uint32(UINT32_MAX)); - - for (int i = 0; i < 1000000; i++) { - ASSERT_TRUE(test_fixed_uint32(rand())); + // int16_t + for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) { + ASSERT_TRUE(test_fixed_int16(value)); + if (value == INT16_MAX) break; } - std::mt19937_64 gen (std::random_device{}()); + std::mt19937 gen32(std::random_device{}()); + // uint32_t + ASSERT_TRUE(test_fixed_uint32(0)); + ASSERT_TRUE(test_fixed_uint32(UINT32_MAX)); + std::uniform_int_distribution distr1(0, UINT32_MAX); + + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_uint32(distr1(gen32))); + } + + // int32_t + ASSERT_TRUE(test_fixed_int32(INT32_MIN)); + ASSERT_TRUE(test_fixed_int32(INT32_MAX)); + std::uniform_int_distribution distr2(INT32_MIN, INT32_MAX); + + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_int32(distr2(gen32))); + } + + std::mt19937_64 gen64(std::random_device{}()); + // uint64_t + std::uniform_int_distribution distr3(0, UINT64_MAX); ASSERT_TRUE(test_fixed_uint64(0)); ASSERT_TRUE(test_fixed_uint64(UINT64_MAX)); for (int i = 0; i < 1000000; i++) { - ASSERT_TRUE(test_fixed_uint64(gen())); + ASSERT_TRUE(test_fixed_uint64(distr3(gen64))); + } + + // int64_t + std::uniform_int_distribution distr4(INT64_MIN, INT64_MAX); + + ASSERT_TRUE(test_fixed_int64(INT64_MIN)); + ASSERT_TRUE(test_fixed_int64(INT64_MAX)); + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_int64(distr4(gen64))); } } TEST(codingTest, variant_encode_decode) { srand(time(0)); + // uint16_t for (uint16_t value = 0; value <= UINT16_MAX; value++) { ASSERT_TRUE(test_variant_uint16(value)); if (value == UINT16_MAX) break; } + // int16_t + for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) { + ASSERT_TRUE(test_variant_int16(value)); + if (value == INT16_MAX) break; + } + + std::mt19937 gen32(std::random_device{}()); + // uint32_t + std::uniform_int_distribution distr1(0, UINT32_MAX); ASSERT_TRUE(test_variant_uint32(0)); ASSERT_TRUE(test_variant_uint32(UINT32_MAX)); for (int i = 0; i < 5000000; i++) { - ASSERT_TRUE(test_variant_uint32(rand())); + ASSERT_TRUE(test_variant_uint32(distr1(gen32))); } - std::mt19937_64 gen (std::random_device{}()); + // int32_t + std::uniform_int_distribution distr2(INT32_MIN, INT32_MAX); + ASSERT_TRUE(test_variant_int32(INT32_MIN)); + ASSERT_TRUE(test_variant_int32(INT32_MAX)); + + for (int i = 0; i < 5000000; i++) { + ASSERT_TRUE(test_variant_int32(distr2(gen32))); + } + + std::mt19937_64 gen64(std::random_device{}()); + // uint64_t + std::uniform_int_distribution distr3(0, UINT64_MAX); ASSERT_TRUE(test_variant_uint64(0)); ASSERT_TRUE(test_variant_uint64(UINT64_MAX)); for (int i = 0; i < 5000000; i++) { - uint64_t value = gen(); + // uint64_t value = gen(); // printf("%ull\n", value); - ASSERT_TRUE(test_variant_uint64(value)); + ASSERT_TRUE(test_variant_uint64(distr3(gen64))); + } + + // int64_t + std::uniform_int_distribution distr4(INT64_MIN, INT64_MAX); + + ASSERT_TRUE(test_variant_int64(INT64_MIN)); + ASSERT_TRUE(test_variant_int64(INT64_MAX)); + for (int i = 0; i < 5000000; i++) { + // uint64_t value = gen(); + // printf("%ull\n", value); + ASSERT_TRUE(test_variant_int64(distr4(gen64))); } } \ No newline at end of file From 540be24bd3616767d9c617134602ad6721410c21 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 30 May 2020 02:25:02 +0000 Subject: [PATCH 03/13] reformat --- src/common/inc/tdataformat.h | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 528e9b2825..3f33c96e96 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -243,31 +243,31 @@ pData //#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) typedef struct { - int16_t colId; // column ID - int16_t colType; - uint16_t offset; //to store value for numeric col or offset for binary/Nchar + int16_t colId; // column ID + int16_t colType; + uint16_t offset; // to store value for numeric col or offset for binary/Nchar } STagCol; typedef struct { - int32_t len; - void * pData; // Space to store the tag value - uint16_t dataLen; - int16_t ncols; // Total columns allocated - STagCol tagCols[]; + int32_t len; + void * pData; // Space to store the tag value + uint16_t dataLen; + int16_t ncols; // Total columns allocated + STagCol tagCols[]; } STagRow; - #define tagColSize(r) (sizeof(STagCol) + r.colLen) -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information -int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information -void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1; -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId); -SDataRow tdTagRowDup(SDataRow row); -void tdFreeTagRow(SDataRow row); -SDataRow tdTagRowDecode(SDataRow row); -int tdTagRowCpy(SDataRow dst, SDataRow src); -void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags); +int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, + int16_t colId); // insert tag value and update all the information +int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information +void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); // if find tag, 0, else return -1; +int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId); +SDataRow tdTagRowDup(SDataRow row); +void tdFreeTagRow(SDataRow row); +SDataRow tdTagRowDecode(SDataRow row); +int tdTagRowCpy(SDataRow dst, SDataRow src); +void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags); STSchema *tdGetSchemaFromData(SDataRow *row); #ifdef __cplusplus From 8c7100e89400566e164d417a56b2ae1baf1a2878 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 30 May 2020 10:00:49 +0000 Subject: [PATCH 04/13] add kv data row definition --- src/common/inc/tdataformat.h | 49 +++++++++++++++++++++ src/common/src/tdataformat.c | 85 ++++++++++++++++++++++++++++++++++++ src/inc/taosdef.h | 1 + src/util/inc/tutil.h | 1 + 4 files changed, 136 insertions(+) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 3f33c96e96..a9350769a1 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -19,6 +19,7 @@ #include #include +#include "talgo.h" #include "taosdef.h" #include "tutil.h" @@ -218,6 +219,54 @@ int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); +// ----------------- K-V data row structure +/* + * +----------+----------+---------------------------------+---------------------------------+ + * | int16_t | int16_t | | | + * +----------+----------+---------------------------------+---------------------------------+ + * | len | ncols | cols index | data part | + * +----------+----------+---------------------------------+---------------------------------+ + */ +typedef void *SKVDataRow; + +typedef struct { + int16_t colId; + int16_t offset; +} SColIdx; + +#define TD_KV_DATA_ROW_HEAD_SIZE 2*sizeof(int16_t) + +#define kvDataRowLen(r) (*(int16_t *)(r)) +#define kvDataRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define kvDataRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_DATA_ROW_HEAD_SIZE) +#define kvDataRowValues(r) POINTER_SHIFT(r, TD_KV_DATA_ROW_HEAD_SIZE + sizeof(SColIdx) * kvDataRowNCols(r)) +#define kvDataRowCpy(dst, r) memcpy((dst), (r), kvDataRowLen(r)) +#define kvDataRowColVal(r, colIdx) POINTER_SHIFT(kvDataRowValues(r), (colIdx)->offset) +#define kvDataRowSetLen(r, len) kvDataRowLen(r) = (len) +#define kvDataRowSetNCols(r, n) kvDataRowNCols(r) = (n) +#define kvDataRowColIdxAt(r, i) (kvDataRowColIdx(r) + (i)) + +SKVDataRow tdKVDataRowDup(SKVDataRow row); +SKVDataRow tdSetKVRowDataOfCol(SKVDataRow row, int16_t colId, int8_t type, void *value); +void * tdEncodeKVDataRow(void *buf, SKVDataRow row); +void * tdDecodeKVDataRow(void *buf, SKVDataRow *row); + +static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { + if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) { + return 1; + } else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) { + return -1; + } else { + return 0; + } +} + +static FORCE_INLINE void *tdGetKVRowDataOfCol(SKVDataRow row, int16_t colId) { + void *ret = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); + if (ret == NULL) return NULL; + return kvDataRowColVal(row, (SColIdx *)ret); +} + // ----------------- Tag row structure /* A tag row, the format is like below: diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 922c8bdea0..3429970e73 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -594,4 +594,89 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol (*iter2)++; } } +} + +SKVDataRow tdKVDataRowDup(SKVDataRow row) { + SKVDataRow trow = malloc(kvDataRowLen(row)); + if (trow == NULL) return NULL; + + kvDataRowCpy(trow, row); + return trow; +} + +SKVDataRow tdSetKVRowDataOfCol(SKVDataRow row, int16_t colId, int8_t type, void *value) { + // TODO + return NULL; + // SColIdx *pColIdx = NULL; + // SKVDataRow rrow = row; + // SKVDataRow nrow = NULL; + // void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); + + // if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row + // int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]); + // nrow = malloc(tlen); + // if (nrow == NULL) return NULL; + + // kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1); + // kvDataRowSetLen(nrow, tlen); + + // if (ptr == NULL) ptr = kvDataRowValues(row); + + // // Copy the columns before the col + // if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) { + // memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row))); + // memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct + // } + + // // Set the new col value + // pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row)); + // pColIdx->colId = colId; + // pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct + + // if (IS_VAR_DATA_TYPE(type)) { + // memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value)); + // } else { + // memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]); + // } + + // // Copy the columns after the col + // if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) { + // // TODO: memcpy(); + // } + // } else { + // // TODO + // ASSERT(((SColIdx *)ptr)->colId == colId); + // if (IS_VAR_DATA_TYPE(type)) { + // void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr); + + // if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place + // memcpy(pOldVal, value, varDataTLen(value)); + // } else { // enlarge the memory + // // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal)); + // // if (rrow == NULL) return NULL; + // // memmove(); + // // for () { + // // ((SColIdx *)ptr)->offset += balabala; + // // } + + // // kvDataRowSetLen(); + + // } + // } else { + // memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]); + // } + // } + + // return rrow; +} + +void *tdEncodeKVDataRow(void *buf, SKVDataRow row) { + // May change the encode purpose + kvDataRowCpy(buf, row); + return POINTER_SHIFT(buf, kvDataRowLen(row)); +} + +void *tdDecodeKVDataRow(void *buf, SKVDataRow *row) { + *row = tdKVDataRowDup(buf); + return POINTER_SHIFT(buf, kvDataRowLen(*row)); } \ No newline at end of file diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 548a39ad42..68efcb8b81 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -52,6 +52,7 @@ typedef struct tstr { #define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len)) +#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) // this data type is internally used only in 'in' query to hold the values #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 5dcb6e406f..527394d13a 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -46,6 +46,7 @@ extern "C" { // Pointer p drift right by b bytes #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) +#define POINTER_DISTANCE(p1, p2) ((char *)p1 - (char *)p2) #ifndef NDEBUG #define ASSERT(x) assert(x) From 8a2802920af133b55544b9686d62d9ccfefa367c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 30 May 2020 10:48:09 +0000 Subject: [PATCH 05/13] add KV data row builder --- src/common/inc/tdataformat.h | 16 +++++++++ src/common/src/tdataformat.c | 68 ++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index a9350769a1..74f636e3f7 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -267,6 +267,22 @@ static FORCE_INLINE void *tdGetKVRowDataOfCol(SKVDataRow row, int16_t colId) { return kvDataRowColVal(row, (SColIdx *)ret); } +// ----------------- K-V data row builder +typedef struct { + int16_t tCols; + int16_t nCols; + SColIdx *pColIdx; + int16_t alloc; + int16_t size; + void * buf; +} SKVDataRowBuilder; + +int tdInitKVDataRowBuilder(SKVDataRowBuilder *pBuilder); +void tdDestroyKVDataRowBuilder(SKVDataRowBuilder *pBuilder); +void tdResetKVDataRowBuilder(SKVDataRowBuilder *pBuilder); +SKVDataRow tdGetKVDataRowFromBuilder(SKVDataRowBuilder *pBuilder); +int tdAddColToKVDataRow(SKVDataRowBuilder *pBuilder, int16_t colId, int8_t type, void *value); + // ----------------- Tag row structure /* A tag row, the format is like below: diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 3429970e73..2658d8f280 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -679,4 +679,72 @@ void *tdEncodeKVDataRow(void *buf, SKVDataRow row) { void *tdDecodeKVDataRow(void *buf, SKVDataRow *row) { *row = tdKVDataRowDup(buf); return POINTER_SHIFT(buf, kvDataRowLen(*row)); +} + +int tdInitKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { + pBuilder->tCols = 128; + pBuilder->nCols = 0; + pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + pBuilder->alloc = 1024; + pBuilder->size = 0; + pBuilder->buf = malloc(pBuilder->alloc); + if (pBuilder->buf == NULL) { + free(pBuilder->pColIdx); + return -1; + } + return 0; +} + +void tdDestroyKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { + tfree(pBuilder->pColIdx); + tfree(pBuilder->buf); +} + +void tdResetKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { + pBuilder->nCols = 0; + pBuilder->size = 0; +} + +SKVDataRow tdGetKVDataRowFromBuilder(SKVDataRowBuilder *pBuilder) { + int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; + if (tlen == 0) return NULL; + + SKVDataRow row = malloc(TD_KV_DATA_ROW_HEAD_SIZE + tlen); + if (row == NULL) return NULL; + + kvDataRowSetNCols(row, pBuilder->nCols); + kvDataRowSetLen(row, TD_KV_DATA_ROW_HEAD_SIZE + tlen); + + memcpy(kvDataRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + memcpy(kvDataRowValues(row), pBuilder->buf, pBuilder->size); + + return row; +} + +int tdAddColToKVDataRow(SKVDataRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { + ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId); + + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + pBuilder->pColIdx = realloc(pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + } + + pBuilder->pColIdx[pBuilder->nCols].colId = colId; + pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; + + pBuilder->nCols++; + + int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + if (tlen > pBuilder->alloc - pBuilder->size) { + pBuilder->alloc *= 2; + pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); + if (pBuilder->buf == NULL) return -1; + } + + memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); + pBuilder->size += tlen; + + return 0; } \ No newline at end of file From 760b66e5748e4e3c42b38c73c4a42d0aaefa507f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 01:57:55 +0000 Subject: [PATCH 06/13] refactor and rename --- src/common/inc/tdataformat.h | 92 +++++++++++------------ src/common/src/tdataformat.c | 139 +++++++++++++++++------------------ 2 files changed, 116 insertions(+), 115 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 74f636e3f7..47a1d642e1 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -27,19 +27,24 @@ extern "C" { #endif -#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \ - *(VarDataLenT*)(x) = __len; \ - strncpy(varDataVal(x), (str), __len);} while(0); +#define STR_TO_VARSTR(x, str) \ + do { \ + VarDataLenT __len = strlen(str); \ + *(VarDataLenT *)(x) = __len; \ + strncpy(varDataVal(x), (str), __len); \ + } while (0); -#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\ - char* _e = stpncpy(varDataVal(x), (str), (_maxs));\ - varDataSetLen(x, (_e - (x) - VARSTR_HEADER_SIZE));\ -} while(0) +#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ + do { \ + char *_e = stpncpy(varDataVal(x), (str), (_maxs)); \ + varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ + } while (0) -#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\ - *(VarDataLenT*)(x) = (_size); \ - strncpy(varDataVal(x), (str), (_size));\ -} while(0); +#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ + do { \ + *(VarDataLenT *)(x) = (_size); \ + strncpy(varDataVal(x), (str), (_size)); \ + } while (0); // ----------------- TSDB COLUMN DEFINITION typedef struct { @@ -73,9 +78,9 @@ typedef struct { #define schemaTLen(s) ((s)->tlen) #define schemaFLen(s) ((s)->flen) #define schemaColAt(s, i) ((s)->columns + i) +#define tdFreeSchema(s) tfree((s)) STSchema *tdNewSchema(int32_t nCols); -#define tdFreeSchema(s) tfree((s)) int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); STSchema *tdDupSchema(STSchema *pSchema); int tdGetSchemaEncodeSize(STSchema *pSchema); @@ -189,12 +194,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { } } - typedef struct { - int maxRowSize; - int maxCols; // max number of columns - int maxPoints; // max number of points - int bufSize; + int maxRowSize; + int maxCols; // max number of columns + int maxPoints; // max number of points + int bufSize; int numOfRows; int numOfCols; // Total number of cols @@ -214,11 +218,10 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); -void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); - // ----------------- K-V data row structure /* * +----------+----------+---------------------------------+---------------------------------+ @@ -227,29 +230,29 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SD * | len | ncols | cols index | data part | * +----------+----------+---------------------------------+---------------------------------+ */ -typedef void *SKVDataRow; +typedef void *SKVRow; typedef struct { int16_t colId; int16_t offset; } SColIdx; -#define TD_KV_DATA_ROW_HEAD_SIZE 2*sizeof(int16_t) +#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t) -#define kvDataRowLen(r) (*(int16_t *)(r)) -#define kvDataRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) -#define kvDataRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_DATA_ROW_HEAD_SIZE) -#define kvDataRowValues(r) POINTER_SHIFT(r, TD_KV_DATA_ROW_HEAD_SIZE + sizeof(SColIdx) * kvDataRowNCols(r)) -#define kvDataRowCpy(dst, r) memcpy((dst), (r), kvDataRowLen(r)) -#define kvDataRowColVal(r, colIdx) POINTER_SHIFT(kvDataRowValues(r), (colIdx)->offset) -#define kvDataRowSetLen(r, len) kvDataRowLen(r) = (len) -#define kvDataRowSetNCols(r, n) kvDataRowNCols(r) = (n) -#define kvDataRowColIdxAt(r, i) (kvDataRowColIdx(r) + (i)) +#define kvRowLen(r) (*(int16_t *)(r)) +#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE) +#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r)) +#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) +#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) +#define kvRowSetLen(r, len) kvRowLen(r) = (len) +#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) +#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) -SKVDataRow tdKVDataRowDup(SKVDataRow row); -SKVDataRow tdSetKVRowDataOfCol(SKVDataRow row, int16_t colId, int8_t type, void *value); -void * tdEncodeKVDataRow(void *buf, SKVDataRow row); -void * tdDecodeKVDataRow(void *buf, SKVDataRow *row); +SKVRow tdKVRowDup(SKVRow row); +SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value); +void * tdEncodeKVRow(void *buf, SKVRow row); +void * tdDecodeKVRow(void *buf, SKVRow *row); static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) { @@ -261,10 +264,10 @@ static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { } } -static FORCE_INLINE void *tdGetKVRowDataOfCol(SKVDataRow row, int16_t colId) { - void *ret = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); +static FORCE_INLINE void *tdGetKVRowDataOfCol(SKVRow row, int16_t colId) { + void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); if (ret == NULL) return NULL; - return kvDataRowColVal(row, (SColIdx *)ret); + return kvRowColVal(row, (SColIdx *)ret); } // ----------------- K-V data row builder @@ -275,19 +278,19 @@ typedef struct { int16_t alloc; int16_t size; void * buf; -} SKVDataRowBuilder; +} SKVRowBuilder; -int tdInitKVDataRowBuilder(SKVDataRowBuilder *pBuilder); -void tdDestroyKVDataRowBuilder(SKVDataRowBuilder *pBuilder); -void tdResetKVDataRowBuilder(SKVDataRowBuilder *pBuilder); -SKVDataRow tdGetKVDataRowFromBuilder(SKVDataRowBuilder *pBuilder); -int tdAddColToKVDataRow(SKVDataRowBuilder *pBuilder, int16_t colId, int8_t type, void *value); +int tdInitKVRowBuilder(SKVRowBuilder *pBuilder); +void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder); +void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); +SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); +int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value); // ----------------- Tag row structure /* A tag row, the format is like below: +----------+----------------------------------------------------------------+ -| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol | +| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol | +----------+----------------------------------------------------------------+ pData @@ -297,7 +300,6 @@ pData */ - #define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t) #define tagRowNum(r) (*(int16_t *)(r)) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 2658d8f280..333e15058d 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ #include "tdataformat.h" -#include "wchar.h" #include "talgo.h" +#include "wchar.h" /** * Create a SSchema object with nCols columns @@ -51,13 +51,13 @@ int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) if (schemaNCols(pSchema) == 0) { colSetOffset(pCol, 0); } else { - STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1); + STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1); colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); } switch (type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); // Set as maximum bytes + colSetBytes(pCol, bytes); // Set as maximum bytes pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); break; default: @@ -152,17 +152,18 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return row; } -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information +int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, + int16_t colId) { // insert/update tag value and update all the information ASSERT(((STagRow *)row)->pData != NULL); - //STagCol * stCol = tdQueryTagColByID() + // STagCol * stCol = tdQueryTagColByID() return 0; -}; +}; -int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information - //todo +int tdDeleteTagCol(SDataRow row, int16_t colId) { // delete tag value and update all the information + // todo return 0; -}; +}; static int compTagId(const void *key1, const void *key2) { if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) { @@ -177,43 +178,43 @@ static int compTagId(const void *key1, const void *key2) { /** * Find tag structure by colId, if find, return tag structure, else return NULL; */ -STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1; +STagCol *tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { // if find tag, 0, else return -1; ASSERT(((STagRow *)row)->pData != NULL); STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId,0,0}; - STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags); + int16_t nCols = ((STagRow *)row)->ncols; + STagCol key = {colId, 0, 0}; + STagCol *stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags); return stCol; -}; +}; /** -* Find tag value by colId, if find, return tag value, else return NULL; -*/ -void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) { + * Find tag value by colId, if find, return tag value, else return NULL; + */ +void *tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) { ASSERT(((STagRow *)row)->pData != NULL); STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId,0,0}; - STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ); + int16_t nCols = ((STagRow *)row)->ncols; + STagCol key = {colId, 0, 0}; + STagCol *stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ); if (NULL == stCol) { return NULL; } - - void * pData = ((STagRow *)row)->pData; + + void *pData = ((STagRow *)row)->pData; *type = stCol->colType; return pData + stCol->offset; -}; +}; -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){ +int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId) { ASSERT(value != NULL); - //ASSERT(bytes-2 == varDataTLen(value)); + // ASSERT(bytes-2 == varDataTLen(value)); ASSERT(row != NULL); STagRow *pTagrow = row; pTagrow->tagCols[pTagrow->ncols].colId = colId; pTagrow->tagCols[pTagrow->ncols].colType = type; pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen; - + switch (type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -224,14 +225,14 @@ int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]); pTagrow->dataLen += TYPE_BYTES[type]; break; - } - - pTagrow->ncols++; + } + + pTagrow->ncols++; return 0; -}; +}; -void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { +void *tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol); STagRow *row = malloc(size); @@ -245,25 +246,25 @@ void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { } row->len = size; - row->dataLen = 0; - row->ncols = 0; - return row; + row->dataLen = 0; + row->ncols = 0; + return row; } /** - * free tag row + * free tag row */ - + void tdFreeTagRow(SDataRow row) { if (row) { free(((STagRow *)row)->pData); free(row); - } + } } SDataRow tdTagRowDup(SDataRow row) { STagRow *trow = malloc(dataRowLen(row)); if (trow == NULL) return NULL; - + dataRowCpy(trow, row); trow->pData = malloc(trow->dataLen); if (NULL == trow->pData) { @@ -277,23 +278,23 @@ SDataRow tdTagRowDup(SDataRow row) { SDataRow tdTagRowDecode(SDataRow row) { STagRow *trow = malloc(dataRowLen(row)); if (trow == NULL) return NULL; - + dataRowCpy(trow, row); trow->pData = malloc(trow->dataLen); if (NULL == trow->pData) { free(trow); return NULL; } - char * pData = (char *)row + dataRowLen(row); + char *pData = (char *)row + dataRowLen(row); memcpy(trow->pData, pData, trow->dataLen); return trow; } int tdTagRowCpy(SDataRow dst, SDataRow src) { if (src == NULL) return -1; - + dataRowCpy(dst, src); - void * pData = dst + dataRowLen(src); + void *pData = dst + dataRowLen(src); memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen); return 0; } @@ -330,7 +331,6 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) pDataCol->pData = *pBuf; *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); } - } void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { @@ -414,7 +414,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { void dataColSetOffset(SDataCol *pCol, int nEle) { ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); - void * tptr = pCol->pData; + void *tptr = pCol->pData; // char *tptr = (char *)(pCol->pData); VarDataOffsetT offset = 0; @@ -596,26 +596,25 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol } } -SKVDataRow tdKVDataRowDup(SKVDataRow row) { - SKVDataRow trow = malloc(kvDataRowLen(row)); +SKVRow tdKVRowDup(SKVRow row) { + SKVRow trow = malloc(kvRowLen(row)); if (trow == NULL) return NULL; - kvDataRowCpy(trow, row); + kvRowCpy(trow, row); return trow; } -SKVDataRow tdSetKVRowDataOfCol(SKVDataRow row, int16_t colId, int8_t type, void *value) { +SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) { // TODO return NULL; // SColIdx *pColIdx = NULL; - // SKVDataRow rrow = row; - // SKVDataRow nrow = NULL; + // SKVRow rrow = row; + // SKVRow nrow = NULL; // void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); // if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row - // int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]); - // nrow = malloc(tlen); - // if (nrow == NULL) return NULL; + // int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : + // TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL; // kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1); // kvDataRowSetLen(nrow, tlen); @@ -670,18 +669,18 @@ SKVDataRow tdSetKVRowDataOfCol(SKVDataRow row, int16_t colId, int8_t type, void // return rrow; } -void *tdEncodeKVDataRow(void *buf, SKVDataRow row) { +void *tdEncodeKVRow(void *buf, SKVRow row) { // May change the encode purpose - kvDataRowCpy(buf, row); - return POINTER_SHIFT(buf, kvDataRowLen(row)); + kvRowCpy(buf, row); + return POINTER_SHIFT(buf, kvRowLen(row)); } -void *tdDecodeKVDataRow(void *buf, SKVDataRow *row) { - *row = tdKVDataRowDup(buf); - return POINTER_SHIFT(buf, kvDataRowLen(*row)); +void *tdDecodeKVRow(void *buf, SKVRow *row) { + *row = tdKVRowDup(buf); + return POINTER_SHIFT(buf, kvRowLen(*row)); } -int tdInitKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { +int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) { pBuilder->tCols = 128; pBuilder->nCols = 0; pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols); @@ -696,33 +695,33 @@ int tdInitKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { return 0; } -void tdDestroyKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { +void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) { tfree(pBuilder->pColIdx); tfree(pBuilder->buf); } -void tdResetKVDataRowBuilder(SKVDataRowBuilder *pBuilder) { +void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { pBuilder->nCols = 0; pBuilder->size = 0; } -SKVDataRow tdGetKVDataRowFromBuilder(SKVDataRowBuilder *pBuilder) { +SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; if (tlen == 0) return NULL; - SKVDataRow row = malloc(TD_KV_DATA_ROW_HEAD_SIZE + tlen); + SKVRow row = malloc(TD_KV_ROW_HEAD_SIZE + tlen); if (row == NULL) return NULL; - kvDataRowSetNCols(row, pBuilder->nCols); - kvDataRowSetLen(row, TD_KV_DATA_ROW_HEAD_SIZE + tlen); + kvRowSetNCols(row, pBuilder->nCols); + kvRowSetLen(row, TD_KV_ROW_HEAD_SIZE + tlen); - memcpy(kvDataRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); - memcpy(kvDataRowValues(row), pBuilder->buf, pBuilder->size); + memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); return row; } -int tdAddColToKVDataRow(SKVDataRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { +int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId); if (pBuilder->nCols >= pBuilder->tCols) { @@ -733,7 +732,7 @@ int tdAddColToKVDataRow(SKVDataRowBuilder *pBuilder, int16_t colId, int8_t type, pBuilder->pColIdx[pBuilder->nCols].colId = colId; pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; - + pBuilder->nCols++; int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; From d2b46fb968eb42bc3394a09c2f7e488e7ab4ea38 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 03:14:46 +0000 Subject: [PATCH 07/13] add fsync when close a file --- src/tsdb/src/tsdbRWHelper.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 38e6b8f2f6..84a107be5a 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -237,20 +237,24 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.headF.fd > 0) { + fsync(pHelper->files.headF.fd); close(pHelper->files.headF.fd); pHelper->files.headF.fd = -1; } if (pHelper->files.dataF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); + fsync(pHelper->files.dataF.fd); close(pHelper->files.dataF.fd); pHelper->files.dataF.fd = -1; } if (pHelper->files.lastF.fd > 0) { + fsync(pHelper->files.lastF.fd); close(pHelper->files.lastF.fd); pHelper->files.lastF.fd = -1; } if (pHelper->files.nHeadF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); + fsync(pHelper->files.nHeadF.fd); close(pHelper->files.nHeadF.fd); pHelper->files.nHeadF.fd = -1; if (hasError) { @@ -263,6 +267,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.nLastF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); + fsync(pHelper->files.nLastF.fd); close(pHelper->files.nLastF.fd); pHelper->files.nLastF.fd = -1; if (hasError) { From 8d5d0fd4efac0b759324c86379863ad016fd8c32 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 03:55:36 +0000 Subject: [PATCH 08/13] refactor --- src/common/inc/tdataformat.h | 6 +++--- src/common/src/tdataformat.c | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 47a1d642e1..19cbbea0fb 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -241,12 +241,12 @@ typedef struct { #define kvRowLen(r) (*(int16_t *)(r)) #define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define kvRowSetLen(r, len) kvRowLen(r) = (len) +#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) #define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE) #define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r)) #define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) #define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) -#define kvRowSetLen(r, len) kvRowLen(r) = (len) -#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) SKVRow tdKVRowDup(SKVRow row); @@ -264,7 +264,7 @@ static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { } } -static FORCE_INLINE void *tdGetKVRowDataOfCol(SKVRow row, int16_t colId) { +static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) { void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); if (ret == NULL) return NULL; return kvRowColVal(row, (SColIdx *)ret); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 333e15058d..536135010a 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -709,11 +709,13 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; if (tlen == 0) return NULL; - SKVRow row = malloc(TD_KV_ROW_HEAD_SIZE + tlen); + tlen += TD_KV_ROW_HEAD_SIZE; + + SKVRow row = malloc(tlen); if (row == NULL) return NULL; kvRowSetNCols(row, pBuilder->nCols); - kvRowSetLen(row, TD_KV_ROW_HEAD_SIZE + tlen); + kvRowSetLen(row, tlen); memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); @@ -737,7 +739,9 @@ int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *v int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; if (tlen > pBuilder->alloc - pBuilder->size) { - pBuilder->alloc *= 2; + while (tlen > pBuilder->alloc - pBuilder->size) { + pBuilder->alloc *= 2; + } pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); if (pBuilder->buf == NULL) return -1; } From a0f3a9e268ec203569aa87532e279c01acb95928 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 05:14:53 +0000 Subject: [PATCH 09/13] refactor --- src/common/inc/tdataformat.h | 30 +++++++++++++++++++++++++++++- src/common/src/tdataformat.c | 29 ----------------------------- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 19cbbea0fb..b1dbe6b3d7 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -284,7 +284,35 @@ int tdInitKVRowBuilder(SKVRowBuilder *pBuilder); void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder); void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); -int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value); + +static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { + ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId); + + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + pBuilder->pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + } + + pBuilder->pColIdx[pBuilder->nCols].colId = colId; + pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; + + pBuilder->nCols++; + + int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + if (tlen > pBuilder->alloc - pBuilder->size) { + while (tlen > pBuilder->alloc - pBuilder->size) { + pBuilder->alloc *= 2; + } + pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); + if (pBuilder->buf == NULL) return -1; + } + + memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); + pBuilder->size += tlen; + + return 0; +} // ----------------- Tag row structure diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 536135010a..e4ee5cc4bc 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -721,33 +721,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); return row; -} - -int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { - ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId); - - if (pBuilder->nCols >= pBuilder->tCols) { - pBuilder->tCols *= 2; - pBuilder->pColIdx = realloc(pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->tCols); - if (pBuilder->pColIdx == NULL) return -1; - } - - pBuilder->pColIdx[pBuilder->nCols].colId = colId; - pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; - - pBuilder->nCols++; - - int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; - if (tlen > pBuilder->alloc - pBuilder->size) { - while (tlen > pBuilder->alloc - pBuilder->size) { - pBuilder->alloc *= 2; - } - pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); - if (pBuilder->buf == NULL) return -1; - } - - memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); - pBuilder->size += tlen; - - return 0; } \ No newline at end of file From 4a1f43c983b1b5a3abfc4aad1c55933647e2b90f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 06:20:24 +0000 Subject: [PATCH 10/13] refactor tag function --- src/common/inc/tdataformat.h | 52 +------------ src/common/src/tdataformat.c | 146 ----------------------------------- src/inc/tsdb.h | 2 +- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbMain.c | 6 +- src/tsdb/src/tsdbMeta.c | 23 ++---- src/tsdb/src/tsdbRead.c | 12 +-- src/vnode/src/vnodeWrite.c | 10 +-- 8 files changed, 21 insertions(+), 232 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index b1dbe6b3d7..706fdeff98 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -248,6 +248,7 @@ typedef struct { #define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) #define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) +#define kvRowFree(r) tfree(r) SKVRow tdKVRowDup(SKVRow row); SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value); @@ -314,57 +315,6 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, return 0; } -// ----------------- Tag row structure - -/* A tag row, the format is like below: -+----------+----------------------------------------------------------------+ -| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol | -+----------+----------------------------------------------------------------+ - -pData -+----------+----------------------------------------------------------------+ -| value 1 | value 2 | value 3 | value 4 | ....|value n | -+----------+----------------------------------------------------------------+ - - */ - -#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t) - -#define tagRowNum(r) (*(int16_t *)(r)) -#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE) -//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) -//#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) -//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) -//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) - -typedef struct { - int16_t colId; // column ID - int16_t colType; - uint16_t offset; // to store value for numeric col or offset for binary/Nchar -} STagCol; - -typedef struct { - int32_t len; - void * pData; // Space to store the tag value - uint16_t dataLen; - int16_t ncols; // Total columns allocated - STagCol tagCols[]; -} STagRow; - -#define tagColSize(r) (sizeof(STagCol) + r.colLen) - -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, - int16_t colId); // insert tag value and update all the information -int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information -void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); // if find tag, 0, else return -1; -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId); -SDataRow tdTagRowDup(SDataRow row); -void tdFreeTagRow(SDataRow row); -SDataRow tdTagRowDecode(SDataRow row); -int tdTagRowCpy(SDataRow dst, SDataRow src); -void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags); -STSchema *tdGetSchemaFromData(SDataRow *row); - #ifdef __cplusplus } #endif diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index e4ee5cc4bc..7a35d5fb69 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -152,152 +152,6 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return row; } -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, - int16_t colId) { // insert/update tag value and update all the information - ASSERT(((STagRow *)row)->pData != NULL); - // STagCol * stCol = tdQueryTagColByID() - - return 0; -}; - -int tdDeleteTagCol(SDataRow row, int16_t colId) { // delete tag value and update all the information - // todo - return 0; -}; - -static int compTagId(const void *key1, const void *key2) { - if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) { - return 1; - } else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) { - return 0; - } else { - return -1; - } -} - -/** - * Find tag structure by colId, if find, return tag structure, else return NULL; - */ -STagCol *tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { // if find tag, 0, else return -1; - ASSERT(((STagRow *)row)->pData != NULL); - STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId, 0, 0}; - STagCol *stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags); - return stCol; -}; - -/** - * Find tag value by colId, if find, return tag value, else return NULL; - */ -void *tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) { - ASSERT(((STagRow *)row)->pData != NULL); - STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId, 0, 0}; - STagCol *stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ); - if (NULL == stCol) { - return NULL; - } - - void *pData = ((STagRow *)row)->pData; - *type = stCol->colType; - - return pData + stCol->offset; -}; - -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId) { - ASSERT(value != NULL); - // ASSERT(bytes-2 == varDataTLen(value)); - ASSERT(row != NULL); - STagRow *pTagrow = row; - pTagrow->tagCols[pTagrow->ncols].colId = colId; - pTagrow->tagCols[pTagrow->ncols].colType = type; - pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen; - - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value)); - pTagrow->dataLen += varDataTLen(value); - break; - default: - memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]); - pTagrow->dataLen += TYPE_BYTES[type]; - break; - } - - pTagrow->ncols++; - - return 0; -}; - -void *tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { - int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol); - - STagRow *row = malloc(size); - if (row == NULL) return NULL; - - int32_t datasize = pSchema->tlen; - row->pData = malloc(datasize); - if (NULL == row->pData) { - free(row); - return NULL; - } - - row->len = size; - row->dataLen = 0; - row->ncols = 0; - return row; -} -/** - * free tag row - */ - -void tdFreeTagRow(SDataRow row) { - if (row) { - free(((STagRow *)row)->pData); - free(row); - } -} - -SDataRow tdTagRowDup(SDataRow row) { - STagRow *trow = malloc(dataRowLen(row)); - if (trow == NULL) return NULL; - - dataRowCpy(trow, row); - trow->pData = malloc(trow->dataLen); - if (NULL == trow->pData) { - free(trow); - return NULL; - } - memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen); - return trow; -} - -SDataRow tdTagRowDecode(SDataRow row) { - STagRow *trow = malloc(dataRowLen(row)); - if (trow == NULL) return NULL; - - dataRowCpy(trow, row); - trow->pData = malloc(trow->dataLen); - if (NULL == trow->pData) { - free(trow); - return NULL; - } - char *pData = (char *)row + dataRowLen(row); - memcpy(trow->pData, pData, trow->dataLen); - return trow; -} - -int tdTagRowCpy(SDataRow dst, SDataRow src) { - if (src == NULL) return -1; - - dataRowCpy(dst, src); - void *pData = dst + dataRowLen(src); - memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen); - return 0; -} /** * Free the SDataRow object */ diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index c758d3aea4..fcfb6c6add 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -101,7 +101,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid); int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); -int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); +int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup); int tsdbTableSetName(STableCfg *config, char *name, bool dup); int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 9dd5136c95..e1b85ae99d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -76,7 +76,7 @@ typedef struct STable { int32_t sversion; STSchema * schema; STSchema * tagSchema; - SDataRow tagVal; + SKVRow tagVal; SMemTable * mem; SMemTable * imem; void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 9c8e57d18a..af4b9608b6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -506,11 +506,11 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) { return 0; } -int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) { +int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) { if (config->type != TSDB_CHILD_TABLE) return -1; if (dup) { - config->tagValues = tdDataRowDup(row); + config->tagValues = tdKVRowDup(row); } else { config->tagValues = row; } @@ -557,7 +557,7 @@ int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) { void tsdbClearTableCfg(STableCfg *config) { if (config->schema) tdFreeSchema(config->schema); if (config->tagSchema) tdFreeSchema(config->tagSchema); - if (config->tagValues) tdFreeDataRow(config->tagValues); + if (config->tagValues) kvRowFree(config->tagValues); tfree(config->name); tfree(config->sname); tfree(config->sql); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 0d9e6a9cf8..e1a9770df6 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -47,8 +47,7 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { ptr = tdEncodeSchema(ptr, pTable->schema); ptr = tdEncodeSchema(ptr, pTable->tagSchema); } else if (pTable->type == TSDB_CHILD_TABLE) { - tdTagRowCpy(ptr, pTable->tagVal); - ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen); + ptr = tdEncodeKVRow(ptr, pTable->tagVal); } else { ptr = tdEncodeSchema(ptr, pTable->schema); } @@ -94,8 +93,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { pTable->schema = tdDecodeSchema(&ptr); pTable->tagSchema = tdDecodeSchema(&ptr); } else if (pTable->type == TSDB_CHILD_TABLE) { - pTable->tagVal = tdTagRowDecode(ptr); - ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen); + ptr = tdDecodeKVRow(ptr, &pTable->tagVal); } else { pTable->schema = tdDecodeSchema(&ptr); } @@ -114,12 +112,9 @@ void tsdbFreeEncode(void *cont) { static char* getTagIndexKey(const void* pData) { STableIndexElem* elem = (STableIndexElem*) pData; - SDataRow row = elem->pTable->tagVal; STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - int16_t type = 0; - void * res = tdQueryTagByID(row, pCol->colId,&type); - ASSERT(type == pCol->type); + void * res = tdGetKVRowValOfCol(elem->pTable->tagVal, pCol->colId); return res; } @@ -271,9 +266,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t return -1; // No matched tags. Maybe the modification of tags has not been done yet. } - SDataRow row = (SDataRow)pTable->tagVal; - int16_t tagtype = 0; - char* d = tdQueryTagByID(row, pCol->colId, &tagtype); + char* d = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); //ASSERT((int8_t)tagtype == pCol->type) *val = d; *type = pCol->type; @@ -352,7 +345,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { if (pCfg->type == TSDB_CHILD_TABLE) { pTable->superUid = pCfg->superUid; - pTable->tagVal = tdDataRowDup(pCfg->tagValues); + pTable->tagVal = tdKVRowDup(pCfg->tagValues); } else if (pCfg->type == TSDB_NORMAL_TABLE) { pTable->superUid = -1; pTable->schema = tdDupSchema(pCfg->schema); @@ -487,7 +480,7 @@ static int tsdbFreeTable(STable *pTable) { if (pTable == NULL) return 0; if (pTable->type == TSDB_CHILD_TABLE) { - tdFreeTagRow(pTable->tagVal); + kvRowFree(pTable->tagVal); } else { tdFreeSchema(pTable->schema); } @@ -636,9 +629,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - int16_t tagtype = 0; - char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype); - ASSERT(pCol->type == tagtype); + char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); SArray* res = tSkipListGet(pSTable->pIndex, key); size_t size = taosArrayGetSize(res); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 2220ebfd88..88f6bb9efc 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1753,9 +1753,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); bytes = pCol->bytes; type = pCol->type; - int16_t tgtype1, tgtype2 = 0; - f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1); - f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2); + f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId); + f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId); } int32_t ret = doCompare(f1, f2, type, bytes); @@ -1843,12 +1842,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { val = (char*) elem->pTable->name; type = TSDB_DATA_TYPE_BINARY; } else { -// STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema?? - int16_t type; - // int32_t offset = pTSchema->columns[pInfo->colIndex].offset; - // val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset); - val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &type); - // ASSERT(pInfo->sch.type == type); + val = tdGetKVRowValOfCol(elem->pTable->tagVal, pInfo->sch.colId); } //todo :the val is possible to be null, so check it out carefully int32_t ret = 0; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 09cb2d3fac..1fa8abe379 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -139,16 +139,16 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; - //dataRow = tdNewDataRowFromSchema(pDestTagSchema); - dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags); + SKVRowBuilder kvRowBuilder; + tdInitKVRowBuilder(&kvRowBuilder); for (int i = 0; i < numOfTags; i++) { STColumn *pTCol = schemaColAt(pDestTagSchema, i); -// tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); - tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId); + tdAddColToKVRow(&kvRowBuilder, pTCol->colId, pTCol->type, pTagData + accumBytes); accumBytes += htons(pSchema[i + numOfColumns].bytes); } - tsdbTableSetTagValue(&tCfg, dataRow, false); + tsdbTableSetTagValue(&tCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); + tdDestroyKVRowBuilder(&kvRowBuilder); } // only normal has sql string From a3f36045ab3a448721a84bfa7ef1e123a317aea5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 06:56:47 +0000 Subject: [PATCH 11/13] resolve more conflict --- src/common/inc/tdataformat.h | 16 ++++++++++++++++ src/tsdb/src/tsdbMeta.c | 29 ++++++++++++----------------- src/tsdb/src/tsdbRWHelper.c | 2 +- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 706fdeff98..1d3319e61d 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -87,6 +87,22 @@ int tdGetSchemaEncodeSize(STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema); STSchema *tdDecodeSchema(void **psrc); +static FORCE_INLINE int comparColId(const void *key1, const void *key2) { + if (*(int16_t *)key1 > ((STColumn *)key2)->colId) { + return 1; + } else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) { + return -1; + } else { + return 0; + } +} + +static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { + void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId); + if (ptr == NULL) return NULL; + return (STColumn *)ptr; +} + // ----------------- Data row structure /* A data row, the format is like below: diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 32ad30598b..f057dcb96e 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -250,29 +250,24 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { STsdbMeta* pMeta = tsdbGetMeta(repo); STable* pTable = tsdbGetTableByUid(pMeta, id->uid); + + STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable); + STColumn *pCol = tdGetColOfID(pSchema, colId); + if (pCol == NULL) { + return -1; // No matched tag volumn + } - *val = tdQueryTagByID(pTable->tagVal, colId, type); + *val = tdGetKVRowValOfCol(pTable->tagVal, colId); + *type = pCol->type; if (*val != NULL) { - switch(*type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: *bytes = varDataLen(*val); break; - case TSDB_DATA_TYPE_NULL: *bytes = 0; break; - default: - *bytes = tDataTypeDesc[*type].nSize;break; + if (IS_VAR_DATA_TYPE(*type)) { + *bytes = varDataLen(*val); + } else { + *bytes = TYPE_BYTES[*type]; } } - if (pCol == NULL) { - return -1; // No matched tags. Maybe the modification of tags has not been done yet. - } - - char* d = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); - //ASSERT((int8_t)tagtype == pCol->type) - *val = d; - *type = pCol->type; - *bytes = pCol->bytes; - return TSDB_CODE_SUCCESS; } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 8c7f8309ec..5bdf37c81e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -453,7 +453,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer)*2); } buf = POINTER_SHIFT(pHelper->pBuffer, drift); - buf = taosEncodeVariant32(buf, i); + buf = taosEncodeVariantU32(buf, i); buf = tsdbEncodeSCompIdx(buf, pCompIdx); } } From 30d34fd25c4ef9f960e01c58abb4cb0c114e851d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 08:09:22 +0000 Subject: [PATCH 12/13] refactor create table --- src/inc/tsdb.h | 5 ++-- src/tsdb/src/tsdbMeta.c | 54 ++++++++++++++++++++++++++++++++++++++ src/vnode/src/vnodeWrite.c | 11 ++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 444f653810..4a954f7ea3 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -107,8 +107,9 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); void tsdbClearTableCfg(STableCfg *config); -int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val); -char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes); +int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val); +char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes); +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f057dcb96e..8d70789b67 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -438,6 +438,60 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { return pTable; } +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { + if (pMsg == NULL) return NULL; + SSchema *pSchema = (SSchema *)pMsg->data; + int16_t numOfCols = htons(pMsg->numOfColumns); + int16_t numOfTags = htons(pMsg->numOfTags); + + STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg)); + if (pCfg == NULL) return NULL; + + if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; + STSchema *pDSchema = tdNewSchema(numOfCols); + if (pDSchema == NULL) goto _err; + for (int i = 0; i < numOfCols; i++) { + tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err; + if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err; + + if (numOfTags > 0) { + STSchema *pTSchema = tdNewSchema(numOfTags); + for (int i = numOfCols; i < numOfCols + numOfTags; i++) { + tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err; + if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; + if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; + + char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + int accBytes = 0; + SKVRowBuilder kvRowBuilder; + + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err; + for (int i = 0; i < numOfTags; i++) { + STColumn *pCol = schemaColAt(pTSchema, i); + tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes); + accBytes += htons(pSchema[i+numOfCols].bytes); + } + tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); + tdDestroyKVRowBuilder(&kvRowBuilder); + } + + if (pMsg->tableType == TSDB_STREAM_TABLE) { + char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + tsdbTableSetStreamSql(pCfg, sql, true); + } + + return pCfg; + +_err: + tsdbClearTableCfg(pCfg); + tfree(pCfg); + return NULL; +} + // int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { int tsdbDropTable(TsdbRepoT *repo, STableId tableId) { STsdbRepo *pRepo = (STsdbRepo *)repo; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 1fa8abe379..5a3da2447f 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -104,6 +104,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR } static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { + + STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont); + if (pCfg == NULL) return terrno; + int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg); + + tsdbClearTableCfg(pCfg); + free(pCfg); + return code; + + #if 0 SMDCreateTableMsg *pTable = pCont; int32_t code = 0; @@ -165,6 +175,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code); return code; + #endif } static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { From bddcc6fa7786c756c3d070f43e0393b14c65fc8d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Jun 2020 08:18:16 +0000 Subject: [PATCH 13/13] refactor code --- src/vnode/src/vnodeWrite.c | 64 -------------------------------------- 1 file changed, 64 deletions(-) diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 5a3da2447f..90e2a482e9 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -112,70 +112,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe tsdbClearTableCfg(pCfg); free(pCfg); return code; - - #if 0 - SMDCreateTableMsg *pTable = pCont; - int32_t code = 0; - - vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId); - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t sid = htonl(pTable->sid); - uint64_t uid = htobe64(pTable->uid); - SSchema * pSchema = (SSchema *)pTable->data; - STSchema *pDestTagSchema = NULL; - SDataRow dataRow = NULL; - - int32_t totalCols = numOfColumns + numOfTags; - - STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid); - - STSchema *pDestSchema = tdNewSchema(numOfColumns); - for (int i = 0; i < numOfColumns; i++) { - tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetSchema(&tCfg, pDestSchema, false); - tsdbTableSetName(&tCfg, pTable->tableId, false); - - if (numOfTags != 0) { - pDestTagSchema = tdNewSchema(numOfTags); - for (int i = numOfColumns; i < totalCols; i++) { - tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); - tsdbTableSetSName(&tCfg, pTable->superTableId, false); - tsdbTableSetSuperUid(&tCfg, htobe64(pTable->superTableUid)); - - char *pTagData = pTable->data + totalCols * sizeof(SSchema); - int accumBytes = 0; - - SKVRowBuilder kvRowBuilder; - tdInitKVRowBuilder(&kvRowBuilder); - for (int i = 0; i < numOfTags; i++) { - STColumn *pTCol = schemaColAt(pDestTagSchema, i); - tdAddColToKVRow(&kvRowBuilder, pTCol->colId, pTCol->type, pTagData + accumBytes); - accumBytes += htons(pSchema[i + numOfColumns].bytes); - } - tsdbTableSetTagValue(&tCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); - tdDestroyKVRowBuilder(&kvRowBuilder); - } - - // only normal has sql string - if (pTable->tableType == TSDB_STREAM_TABLE) { - char *sql = pTable->data + totalCols * sizeof(SSchema); - vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql); - tsdbTableSetStreamSql(&tCfg, sql, false); - } - - code = tsdbCreateTable(pVnode->tsdb, &tCfg); - tdFreeDataRow(dataRow); - tfree(pDestTagSchema); - tfree(pDestSchema); - - vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code); - return code; - #endif } static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {