From 736ddfbbff3153a7d91eeadb98d902f7df05f11c Mon Sep 17 00:00:00 2001 From: localvar Date: Sun, 19 Apr 2020 18:52:24 +0800 Subject: [PATCH 1/7] TD-153: add exception handling --- src/util/inc/exception.h | 52 ++++++ src/util/inc/tbuffer.h | 206 ++++++++++++++--------- src/util/src/exception.c | 20 +++ src/util/src/tbuffer.c | 343 +++++++++++++++++++++++++++++++++++---- 4 files changed, 507 insertions(+), 114 deletions(-) create mode 100644 src/util/inc/exception.h create mode 100644 src/util/src/exception.c diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h new file mode 100644 index 0000000000..5c9506f802 --- /dev/null +++ b/src/util/inc/exception.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_EXCEPTION_H +#define TDENGINE_EXCEPTION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SExceptionNode { + struct SExceptionNode* prev; + jmp_buf jb; + int code; +} SExceptionNode; + +void expPushNode( SExceptionNode* node ); +int expPopNode(); +void expThrow( int code ); + +#define TRY do { \ + SExceptionNode expNode = { 0 }; \ + expPushNode( &expNode ); \ + if( setjmp(expNode.jb) == 0 ) { + +#define CATCH( code ) expPopNode(); \ + } else { \ + int code = expPopNode(); + +#define END_CATCH } } while( 0 ); + +#define THROW( x ) expThrow( (x) ) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 9bc0fd9f43..2d8ea732cc 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -16,122 +16,170 @@ #ifndef TDENGINE_TBUFFER_H #define TDENGINE_TBUFFER_H -#include "setjmp.h" -#include "os.h" +#include +#include #ifdef __cplusplus extern "C" { #endif /* -SBuffer can be used to read or write a buffer, but cannot be used for both -read & write at a same time. Below is an example: +// SBuffer can be used to read or write a buffer, but cannot be used for both +// read & write at a same time. Below is an example: +#include +#include +#include "exception.h" +#include "tbuffer.h" -int main(int argc, char** argv) { - //--------------------- write ------------------------ - SBuffer wbuf; - int32_t code = tbufBeginWrite(&wbuf); - if (code != 0) { - // handle errors - return 0; - } +int foo() { + SBuffer wbuf, rbuf; + tbufSetup(&wbuf, NULL, false); + tbufSetup(&rbuf, NULL, false); - // reserve 1024 bytes for the buffer to improve performance - tbufEnsureCapacity(&wbuf, 1024); + TRY { + //--------------------- write ------------------------ + tbufBeginWrite(&wbuf); + // reserve 1024 bytes for the buffer to improve performance + tbufEnsureCapacity(&wbuf, 1024); + // write 5 integers to the buffer + for (int i = 0; i < 5; i++) { + tbufWriteInt32(&wbuf, i); + } + // write a string to the buffer + tbufWriteString(&wbuf, "this is a string.\n"); + // acquire the result and close the write buffer + size_t size = tbufTell(&wbuf); + char* data = tbufGetData(&wbuf, true); - // write 5 integers to the buffer - for (int i = 0; i < 5; i++) { - tbufWriteInt32(&wbuf, i); - } + //------------------------ read ----------------------- + tbufBeginRead(&rbuf, data, size); + // read & print out 5 integers + for (int i = 0; i < 5; i++) { + printf("%d\n", tbufReadInt32(&rbuf)); + } + // read & print out a string + puts(tbufReadString(&rbuf, NULL)); + // try read another integer, this result in an error as there no this integer + tbufReadInt32(&rbuf); + printf("you should not see this message.\n"); + } CATCH( code ) { + printf("exception code is: %d, you will see this message after print out 5 integers and a string.\n", code); + THROW( code ); + } END_CATCH - // write a string to the buffer - tbufWriteString(&wbuf, "this is a string.\n"); - - // acquire the result and close the write buffer - size_t size = tbufTell(&wbuf); - char* data = tbufGetData(&wbuf, true); tbufClose(&wbuf, true); - - - //------------------------ read ----------------------- - SBuffer rbuf; - code = tbufBeginRead(&rbuf, data, size); - if (code != 0) { - printf("you will see this message after print out 5 integers and a string.\n"); - tbufClose(&rbuf, false); - return 0; - } - - // read & print out 5 integers - for (int i = 0; i < 5; i++) { - printf("%d\n", tbufReadInt32(&rbuf)); - } - - // read & print out a string - printf(tbufReadString(&rbuf, NULL)); - - // try read another integer, this result in an error as there no this integer - tbufReadInt32(&rbuf); - - printf("you should not see this message.\n"); tbufClose(&rbuf, false); - return 0; } + +int main(int argc, char** argv) { + TRY { + printf("in main: you will see this line\n"); + foo(); + printf("in main: you will not see this line\n"); + } CATCH( code ) { + printf("foo raise an exception with code %d\n", code); + } END_CATCH + + return 0; +} */ + typedef struct { - jmp_buf jb; + void* (*allocator)(void*, size_t); + bool endian; char* data; size_t pos; size_t size; } SBuffer; // common functions can be used in both read & write -#define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) + +// tbufSetup setup the buffer, should be called before tbufBeginRead / tbufBeginWrite +// *allocator*, function to allocate memory, will use 'realloc' if NULL +// *endian*, if true, read/write functions of primitive types will do 'ntoh' or 'hton' automatically +void tbufSetup(SBuffer* buf, void* (*allocator)(void*, size_t), bool endian); size_t tbufTell(SBuffer* buf); size_t tbufSeekTo(SBuffer* buf, size_t pos); -size_t tbufSkip(SBuffer* buf, size_t size); void tbufClose(SBuffer* buf, bool keepData); // basic read functions -#define tbufBeginRead(buf, _data, len) ((buf)->data = (char*)(_data), ((buf)->pos = 0), ((buf)->size = ((_data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) +void tbufBeginRead(SBuffer* buf, void* data, size_t len); +size_t tbufSkip(SBuffer* buf, size_t size); char* tbufRead(SBuffer* buf, size_t size); void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); const char* tbufReadString(SBuffer* buf, size_t* len); size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); +const char* tbufReadBinary(SBuffer* buf, size_t *len); +size_t tbufReadToBinary(SBuffer* buf, void* dst, size_t size); // basic write functions -#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) -void tbufEnsureCapacity(SBuffer* buf, size_t size); -char* tbufGetData(SBuffer* buf, bool takeOver); -void tbufWrite(SBuffer* buf, const void* data, size_t size); -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); -void tbufWriteString(SBuffer* buf, const char* str); +void tbufBeginWrite(SBuffer* buf); +void tbufEnsureCapacity(SBuffer* buf, size_t size); +size_t tbufReserve(SBuffer* buf, size_t size); +char* tbufGetData(SBuffer* buf, bool takeOver); +void tbufWrite(SBuffer* buf, const void* data, size_t size); +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); +void tbufWriteString(SBuffer* buf, const char* str); +// the prototype of WriteBinary and Write is identical +// the difference is: WriteBinary writes the length of the data to the buffer +// first, then the actual data, which means the reader don't need to know data +// size before read. Write only write the data itself, which means the reader +// need to know data size before read. +void tbufWriteBinary(SBuffer* buf, const void* data, size_t len); -// read & write function for primitive types -#ifndef TBUFFER_DEFINE_FUNCTION -#define TBUFFER_DEFINE_FUNCTION(type, name) \ - type tbufRead##name(SBuffer* buf); \ - void tbufWrite##name(SBuffer* buf, type data); \ - void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); -#endif +// read / write functions for primitive types +bool tbufReadBool(SBuffer* buf); +void tbufWriteBool(SBuffer* buf, bool data); +void tbufWriteBoolAt(SBuffer* buf, size_t pos, bool data); -TBUFFER_DEFINE_FUNCTION(bool, Bool) -TBUFFER_DEFINE_FUNCTION(char, Char) -TBUFFER_DEFINE_FUNCTION(int8_t, Int8) -TBUFFER_DEFINE_FUNCTION(uint8_t, Uint8) -TBUFFER_DEFINE_FUNCTION(int16_t, Int16) -TBUFFER_DEFINE_FUNCTION(uint16_t, Uint16) -TBUFFER_DEFINE_FUNCTION(int32_t, Int32) -TBUFFER_DEFINE_FUNCTION(uint32_t, Uint32) -TBUFFER_DEFINE_FUNCTION(int64_t, Int64) -TBUFFER_DEFINE_FUNCTION(uint64_t, Uint64) -TBUFFER_DEFINE_FUNCTION(float, Float) -TBUFFER_DEFINE_FUNCTION(double, Double) +char tbufReadChar(SBuffer* buf); +void tbufWriteChar(SBuffer* buf, char data); +void tbufWriteCharAt(SBuffer* buf, size_t pos, char data); + +int8_t tbufReadInt8(SBuffer* buf); +void tbufWriteInt8(SBuffer* buf, int8_t data); +void tbufWriteInt8At(SBuffer* buf, size_t pos, int8_t data); + +uint8_t tbufReadUint8(SBuffer* buf); +void tbufWriteUint8(SBuffer* buf, uint8_t data); +void tbufWriteUint8At(SBuffer* buf, size_t pos, uint8_t data); + +int16_t tbufReadInt16(SBuffer* buf); +void tbufWriteInt16(SBuffer* buf, int16_t data); +void tbufWriteInt16At(SBuffer* buf, size_t pos, int16_t data); + +uint16_t tbufReadUint16(SBuffer* buf); +void tbufWriteUint16(SBuffer* buf, uint16_t data); +void tbufWriteUint16At(SBuffer* buf, size_t pos, uint16_t data); + +int32_t tbufReadInt32(SBuffer* buf); +void tbufWriteInt32(SBuffer* buf, int32_t data); +void tbufWriteInt32At(SBuffer* buf, size_t pos, int32_t data); + +uint32_t tbufReadUint32(SBuffer* buf); +void tbufWriteUint32(SBuffer* buf, uint32_t data); +void tbufWriteUint32At(SBuffer* buf, size_t pos, uint32_t data); + +int64_t tbufReadInt64(SBuffer* buf); +void tbufWriteInt64(SBuffer* buf, int64_t data); +void tbufWriteInt64At(SBuffer* buf, size_t pos, int64_t data); + +uint64_t tbufReadUint64(SBuffer* buf); +void tbufWriteUint64(SBuffer* buf, uint64_t data); +void tbufWriteUint64At(SBuffer* buf, size_t pos, uint64_t data); + +float tbufReadFloat(SBuffer* buf); +void tbufWriteFloat(SBuffer* buf, float data); +void tbufWriteFloatAt(SBuffer* buf, size_t pos, float data); + +double tbufReadDouble(SBuffer* buf); +void tbufWriteDouble(SBuffer* buf, double data); +void tbufWriteDoubleAt(SBuffer* buf, size_t pos, double data); #ifdef __cplusplus } #endif -#endif \ No newline at end of file +#endif diff --git a/src/util/src/exception.c b/src/util/src/exception.c new file mode 100644 index 0000000000..6363aaebf6 --- /dev/null +++ b/src/util/src/exception.c @@ -0,0 +1,20 @@ +#include "exception.h" + + +static _Thread_local SExceptionNode* expList; + +void expPushNode( SExceptionNode* node ) { + node->prev = expList; + expList = node; +} + +int expPopNode() { + SExceptionNode* node = expList; + expList = node->prev; + return node->code; +} + +void expThrow( int code ) { + expList->code = code; + longjmp( expList->jb, 1 ); +} diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index a83d7dddb0..c254436a4e 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -16,47 +16,44 @@ #include #include #include - -#define TBUFFER_DEFINE_FUNCTION(type, name) \ - type tbufRead##name(SBuffer* buf) { \ - type ret; \ - tbufReadToBuffer(buf, &ret, sizeof(type)); \ - return ret; \ - }\ - void tbufWrite##name(SBuffer* buf, type data) {\ - tbufWrite(buf, &data, sizeof(data));\ - }\ - void tbufWrite##name##At(SBuffer* buf, size_t pos, type data) {\ - tbufWriteAt(buf, pos, &data, sizeof(data));\ - } - +#include #include "tbuffer.h" - +#include "exception.h" +#include //////////////////////////////////////////////////////////////////////////////// // common functions +void tbufSetup( + SBuffer* buf, + void* (*allocator)(void*, size_t), + bool endian +) { + if (allocator != NULL) { + buf->allocator = allocator; + } else { + buf->allocator = realloc; + } + + buf->endian = endian; +} + size_t tbufTell(SBuffer* buf) { return buf->pos; } size_t tbufSeekTo(SBuffer* buf, size_t pos) { if (pos > buf->size) { - // TODO: update error code, other tbufThrowError need to be changed too - tbufThrowError(buf, 1); + THROW( TSDB_CODE_MEMORY_CORRUPTED ); } size_t old = buf->pos; buf->pos = pos; return old; } -size_t tbufSkip(SBuffer* buf, size_t size) { - return tbufSeekTo(buf, buf->pos + size); -} - void tbufClose(SBuffer* buf, bool keepData) { if (!keepData) { - free(buf->data); + (*buf->allocator)(buf->data, 0); } buf->data = NULL; buf->pos = 0; @@ -66,6 +63,16 @@ void tbufClose(SBuffer* buf, bool keepData) { //////////////////////////////////////////////////////////////////////////////// // read functions +void tbufBeginRead(SBuffer* buf, void* data, size_t len) { + buf->data = data; + buf->pos = 0; + buf->size = (data == NULL) ? 0 : len; +} + +size_t tbufSkip(SBuffer* buf, size_t size) { + return tbufSeekTo(buf, buf->pos + size); +} + char* tbufRead(SBuffer* buf, size_t size) { char* ret = buf->data + buf->pos; tbufSkip(buf, size); @@ -78,8 +85,16 @@ void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { memcpy(dst, tbufRead(buf, size), size); } -const char* tbufReadString(SBuffer* buf, size_t* len) { +static size_t tbufReadLength(SBuffer* buf) { + // maximum length is 65535, if larger length is required + // this function and the corresponding write function need to be + // revised. uint16_t l = tbufReadUint16(buf); + return l; +} + +const char* tbufReadString(SBuffer* buf, size_t* len) { + size_t l = tbufReadLength(buf); char* ret = buf->data + buf->pos; tbufSkip(buf, l + 1); ret[l] = 0; // ensure the string end with '\0' @@ -101,23 +116,55 @@ size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { return len; } +const char* tbufReadBinary(SBuffer* buf, size_t *len) { + size_t l = tbufReadLength(buf); + char* ret = buf->data + buf->pos; + tbufSkip(buf, l); + if (len != NULL) { + *len = l; + } + return ret; +} + +size_t tbufReadToBinary(SBuffer* buf, void* dst, size_t size) { + assert(dst != NULL); + size_t len; + const char* data = tbufReadBinary(buf, &len); + if (len >= size) { + len = size; + } + memcpy(dst, data, len); + return len; +} //////////////////////////////////////////////////////////////////////////////// // write functions +void tbufBeginWrite(SBuffer* buf) { + buf->data = NULL; + buf->pos = 0; + buf->size = 0; +} + void tbufEnsureCapacity(SBuffer* buf, size_t size) { size += buf->pos; if (size > buf->size) { size_t nsize = size + buf->size; - char* data = realloc(buf->data, nsize); + char* data = (*buf->allocator)(buf->data, nsize); if (data == NULL) { - tbufThrowError(buf, 2); + // TODO: handle client out of memory + THROW( TSDB_CODE_SERV_OUT_OF_MEMORY ); } buf->data = data; buf->size = nsize; } } +size_t tbufReserve(SBuffer* buf, size_t size) { + tbufEnsureCapacity(buf, size); + return tbufSeekTo(buf, buf->pos + size); +} + char* tbufGetData(SBuffer* buf, bool takeOver) { char* ret = buf->data; if (takeOver) { @@ -129,13 +176,6 @@ char* tbufGetData(SBuffer* buf, bool takeOver) { return ret; } -void tbufEndWrite(SBuffer* buf) { - free(buf->data); - buf->data = NULL; - buf->pos = 0; - buf->size = 0; -} - void tbufWrite(SBuffer* buf, const void* data, size_t size) { assert(data != NULL); tbufEnsureCapacity(buf, size); @@ -151,15 +191,248 @@ void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { memcpy(buf->data + pos, data, size); } -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { - // maximum string length is 65535, if longer string is required +static void tbufWriteLength(SBuffer* buf, size_t len) { + // maximum length is 65535, if larger length is required // this function and the corresponding read function need to be // revised. assert(len <= 0xffff); tbufWriteUint16(buf, (uint16_t)len); - tbufWrite(buf, str, len + 1); +} + +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { + tbufWriteLength(buf, len); + tbufWrite(buf, str, len); + tbufWriteChar(buf, '\0'); } void tbufWriteString(SBuffer* buf, const char* str) { tbufWriteStringLen(buf, str, strlen(str)); } + +void tbufWriteBinary(SBuffer* buf, const void* data, size_t len) { + tbufWriteLength(buf, len); + tbufWrite(buf, data, len); +} + +//////////////////////////////////////////////////////////////////////////////// +// read / write functions for primitive types + +bool tbufReadBool(SBuffer* buf) { + bool ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + return ret; +} + +void tbufWriteBool(SBuffer* buf, bool data) { + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteBoolAt(SBuffer* buf, size_t pos, bool data) { + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +char tbufReadChar(SBuffer* buf) { + char ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + return ret; +} + +void tbufWriteChar(SBuffer* buf, char data) { + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteCharAt(SBuffer* buf, size_t pos, char data) { + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +int8_t tbufReadInt8(SBuffer* buf) { + int8_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + return ret; +} + +void tbufWriteInt8(SBuffer* buf, int8_t data) { + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteInt8At(SBuffer* buf, size_t pos, int8_t data) { + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +uint8_t tbufReadUint8(SBuffer* buf) { + uint8_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + return ret; +} + +void tbufWriteUint8(SBuffer* buf, uint8_t data) { + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteUint8At(SBuffer* buf, size_t pos, uint8_t data) { + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +int16_t tbufReadInt16(SBuffer* buf) { + int16_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return (int16_t)ntohs(ret); + } + return ret; +} + +void tbufWriteInt16(SBuffer* buf, int16_t data) { + if (buf->endian) { + data = (int16_t)htons(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteInt16At(SBuffer* buf, size_t pos, int16_t data) { + if (buf->endian) { + data = (int16_t)htons(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +uint16_t tbufReadUint16(SBuffer* buf) { + uint16_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return ntohs(ret); + } + return ret; +} + +void tbufWriteUint16(SBuffer* buf, uint16_t data) { + if (buf->endian) { + data = htons(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteUint16At(SBuffer* buf, size_t pos, uint16_t data) { + if (buf->endian) { + data = htons(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +int32_t tbufReadInt32(SBuffer* buf) { + int32_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return (int32_t)ntohl(ret); + } + return ret; +} + +void tbufWriteInt32(SBuffer* buf, int32_t data) { + if (buf->endian) { + data = (int32_t)htonl(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteInt32At(SBuffer* buf, size_t pos, int32_t data) { + if (buf->endian) { + data = (int32_t)htonl(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +uint32_t tbufReadUint32(SBuffer* buf) { + uint32_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return ntohl(ret); + } + return ret; +} + +void tbufWriteUint32(SBuffer* buf, uint32_t data) { + if (buf->endian) { + data = htonl(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteUint32At(SBuffer* buf, size_t pos, uint32_t data) { + if (buf->endian) { + data = htonl(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +int64_t tbufReadInt64(SBuffer* buf) { + int64_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return (int64_t)htobe64(ret); // TODO: ntohll + } + return ret; +} + +void tbufWriteInt64(SBuffer* buf, int64_t data) { + if (buf->endian) { + data = (int64_t)htobe64(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteInt64At(SBuffer* buf, size_t pos, int64_t data) { + if (buf->endian) { + data = (int64_t)htobe64(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +uint64_t tbufReadUint64(SBuffer* buf) { + uint64_t ret; + tbufReadToBuffer(buf, &ret, sizeof(ret)); + if (buf->endian) { + return htobe64(ret); // TODO: ntohll + } + return ret; +} + +void tbufWriteUint64(SBuffer* buf, uint64_t data) { + if (buf->endian) { + data = htobe64(data); + } + tbufWrite(buf, &data, sizeof(data)); +} + +void tbufWriteUint64At(SBuffer* buf, size_t pos, uint64_t data) { + if (buf->endian) { + data = htobe64(data); + } + tbufWriteAt(buf, pos, &data, sizeof(data)); +} + +float tbufReadFloat(SBuffer* buf) { + uint32_t ret = tbufReadUint32(buf); + return *(float*)(&ret); +} + +void tbufWriteFloat(SBuffer* buf, float data) { + tbufWriteUint32(buf, *(uint32_t*)(&data)); +} + +void tbufWriteFloatAt(SBuffer* buf, size_t pos, float data) { + tbufWriteUint32At(buf, pos, *(uint32_t*)(&data)); +} + +double tbufReadDouble(SBuffer* buf) { + uint64_t ret = tbufReadUint64(buf); + return *(double*)(&ret); +} + +void tbufWriteDouble(SBuffer* buf, double data) { + tbufWriteUint64(buf, *(uint64_t*)(&data)); +} + +void tbufWriteDoubleAt(SBuffer* buf, size_t pos, double data) { + tbufWriteUint64At(buf, pos, *(uint64_t*)(&data)); +} From 4826a1851c56471a5aa19f23ba7e9cf7bb1a2ebc Mon Sep 17 00:00:00 2001 From: localvar Date: Fri, 24 Apr 2020 15:37:34 +0800 Subject: [PATCH 2/7] TD-153: add defer support --- src/util/inc/exception.h | 70 +++++++++++++++++++++++++++++++++------- src/util/src/exception.c | 29 +++++++++++++++-- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h index 5c9506f802..19c37561d2 100644 --- a/src/util/inc/exception.h +++ b/src/util/inc/exception.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2020 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -17,33 +17,81 @@ #define TDENGINE_EXCEPTION_H #include +#include +#include #ifdef __cplusplus extern "C" { #endif +/* + * exception handling + */ typedef struct SExceptionNode { struct SExceptionNode* prev; jmp_buf jb; int code; } SExceptionNode; -void expPushNode( SExceptionNode* node ); -int expPopNode(); -void expThrow( int code ); +void exceptionPushNode( SExceptionNode* node ); +int exceptionPopNode(); +void exceptionThrow( int code ); + +#define THROW( x ) exceptionThrow( (x) ) +#define CAUGHT_EXCEPTION() (caught_exception == 1) #define TRY do { \ SExceptionNode expNode = { 0 }; \ - expPushNode( &expNode ); \ - if( setjmp(expNode.jb) == 0 ) { + exceptionPushNode( &expNode ); \ + int caught_exception = setjmp(expNode.jb); \ + if( caught_exception == 0 ) -#define CATCH( code ) expPopNode(); \ - } else { \ - int code = expPopNode(); +#define CATCH( code ) int code = exceptionPopNode(); \ + if( caught_exception == 1 ) -#define END_CATCH } } while( 0 ); +#define FINALLY( code ) int code = exceptionPopNode(); -#define THROW( x ) expThrow( (x) ) +#define END_TRY } while( 0 ); + + +/* + * defered operations + */ +typedef struct SDeferedOperation { + void (*wrapper)( struct SDeferedOperation* dp ); + void* func; + void* arg; +} SDeferedOperation; + +void deferExecute( SDeferedOperation* operations, unsigned int numOfOperations ); +void deferWrapper_void_void( SDeferedOperation* dp ); +void deferWrapper_void_ptr( SDeferedOperation* dp ); +void deferWrapper_int_int( SDeferedOperation* dp ); + +#define DEFER_INIT( MaxOperations ) unsigned int maxDeferedOperations = MaxOperations, numOfDeferedOperations = 0; \ + SDeferedOperation deferedOperations[MaxOperations] + +#define DEFER_PUSH( wrapperFunc, deferedFunc, argument ) do { \ + assert( numOfDeferedOperations < maxDeferedOperations ); \ + SDeferedOperation* dp = deferedOperations + numOfDeferedOperations++; \ + dp->wrapper = wrapperFunc; \ + dp->func = (void*)deferedFunc; \ + dp->arg = (void*)argument; \ +} while( 0 ) + +#define DEFER_POP() do { --numOfDeferedOperations; } while( 0 ) + +#define DEFER_EXECUTE() do{ \ + deferExecute( deferedOperations, numOfDeferedOperations ); \ + numOfDeferedOperations = 0; \ +} while( 0 ) + +#define DEFER_PUSH_VOID_PTR( func, arg ) DEFER_PUSH( deferWrapper_void_ptr, func, arg ) +#define DEFER_PUSH_INT_INT( func, arg ) DEFER_PUSH( deferWrapper_int_int, func, arg ) +#define DEFER_PUSH_VOID_VOID( func ) DEFER_PUSH( deferWrapper_void_void, func, 0 ) + +#define DEFER_PUSH_FREE( arg ) DEFER_PUSH( deferWrapper_void_ptr, free, arg ) +#define DEFER_PUSH_CLOSE( arg ) DEFER_PUSH( deferWrapper_int_int, close, arg ) #ifdef __cplusplus } diff --git a/src/util/src/exception.c b/src/util/src/exception.c index 6363aaebf6..45ebd349a5 100644 --- a/src/util/src/exception.c +++ b/src/util/src/exception.c @@ -3,18 +3,41 @@ static _Thread_local SExceptionNode* expList; -void expPushNode( SExceptionNode* node ) { +void exceptionPushNode( SExceptionNode* node ) { node->prev = expList; expList = node; } -int expPopNode() { +int exceptionPopNode() { SExceptionNode* node = expList; expList = node->prev; return node->code; } -void expThrow( int code ) { +void exceptionThrow( int code ) { expList->code = code; longjmp( expList->jb, 1 ); } + +void deferWrapper_void_ptr( SDeferedOperation* dp ) { + void (*func)( void* ) = dp->func; + func( dp->arg ); +} + +void deferWrapper_int_int( SDeferedOperation* dp ) { + int (*func)( int ) = dp->func; + func( (int)(intptr_t)(dp->arg) ); +} + +void deferWrapper_void_void( SDeferedOperation* dp ) { + void (*func)() = dp->func; + func(); +} + +void deferExecute( SDeferedOperation* operations, unsigned int numOfOperations ) { + while( numOfOperations > 0 ) { + --numOfOperations; + SDeferedOperation* dp = operations + numOfOperations; + dp->wrapper( dp ); + } +} From c24f7ea7bf1a3fc404c502682a85671fe33b66b0 Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 27 Apr 2020 08:03:32 +0800 Subject: [PATCH 3/7] update defer and rename it to cleanup --- src/util/inc/exception.h | 116 ++++++++++++++++++++++----------------- src/util/src/exception.c | 108 +++++++++++++++++++++++++++++++----- 2 files changed, 159 insertions(+), 65 deletions(-) diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h index 19c37561d2..229ba89d04 100644 --- a/src/util/inc/exception.h +++ b/src/util/inc/exception.h @@ -25,73 +25,87 @@ extern "C" { #endif /* - * exception handling + * cleanup actions + */ +typedef struct SCleanupAction { + bool failOnly; + uint8_t wrapper; + uint16_t reserved; + void* func; + union { + void* Ptr; + bool Bool; + char Char; + int8_t Int8; + uint8_t Uint8; + int16_t Int16; + uint16_t Uint16; + int Int; + unsigned int Uint; + int32_t Int32; + uint32_t Uint32; + int64_t Int64; + uint64_t Uint64; + float Float; + double Double; + } arg1, arg2; +} SCleanupAction; + +void cleanupPush_void_ptr_ptr ( bool failOnly, void* func, void* arg1, void* arg2 ); +void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool arg2 ); +void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); +void cleanupPush_int_int ( bool failOnly, void* func, int arg ); +void cleanupPush_void ( bool failOnly, void* func ); + +int32_t cleanupGetActionCount(); +void cleanupExecute( bool failed, int32_t toIndex ); + +#define CLEANUP_PUSH_VOID_PTR_PTR( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_ptr( (failOnly), (void*)(func), (void*)(arg1), (void*)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR_BOOL( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_bool( (failOnly), (void*)(func), (void*)(arg1), (bool)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) +#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) +#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) +#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) +#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) + +#define CLEANUP_CREATE_ANCHOR() int32_t cleanupAnchor = cleanupGetActionCount() +#define CLEANUP_EXECUTE( failed ) cleanupExecute( cleanupAnchor, (failed) ) + +/* + * exception hander registration */ typedef struct SExceptionNode { struct SExceptionNode* prev; jmp_buf jb; - int code; + int32_t code; + int32_t maxCleanupAction; + int32_t numCleanupAction; + SCleanupAction* cleanupActions; } SExceptionNode; void exceptionPushNode( SExceptionNode* node ); -int exceptionPopNode(); +int32_t exceptionPopNode(); void exceptionThrow( int code ); -#define THROW( x ) exceptionThrow( (x) ) -#define CAUGHT_EXCEPTION() (caught_exception == 1) - -#define TRY do { \ - SExceptionNode expNode = { 0 }; \ - exceptionPushNode( &expNode ); \ - int caught_exception = setjmp(expNode.jb); \ - if( caught_exception == 0 ) +#define TRY(maxCleanupActions) do { \ + SExceptionNode exceptionNode = { 0 }; \ + SDeferedOperation cleanupActions[maxCleanupActions > 0 ? maxCleanupActions : 1]; \ + exceptionNode.maxCleanupAction = maxCleanupActions > 0 ? maxDefered : 1; \ + exceptionNode.cleanupActions = cleanupActions; \ + int32_t cleanupAnchor = 0; \ + exceptionPushNode( &exceptionNode ); \ + int caughtException = setjmp( exceptionNode.jb ); \ + if( caughtException == 0 ) #define CATCH( code ) int code = exceptionPopNode(); \ - if( caught_exception == 1 ) + if( caughtEexception == 1 ) #define FINALLY( code ) int code = exceptionPopNode(); #define END_TRY } while( 0 ); - -/* - * defered operations - */ -typedef struct SDeferedOperation { - void (*wrapper)( struct SDeferedOperation* dp ); - void* func; - void* arg; -} SDeferedOperation; - -void deferExecute( SDeferedOperation* operations, unsigned int numOfOperations ); -void deferWrapper_void_void( SDeferedOperation* dp ); -void deferWrapper_void_ptr( SDeferedOperation* dp ); -void deferWrapper_int_int( SDeferedOperation* dp ); - -#define DEFER_INIT( MaxOperations ) unsigned int maxDeferedOperations = MaxOperations, numOfDeferedOperations = 0; \ - SDeferedOperation deferedOperations[MaxOperations] - -#define DEFER_PUSH( wrapperFunc, deferedFunc, argument ) do { \ - assert( numOfDeferedOperations < maxDeferedOperations ); \ - SDeferedOperation* dp = deferedOperations + numOfDeferedOperations++; \ - dp->wrapper = wrapperFunc; \ - dp->func = (void*)deferedFunc; \ - dp->arg = (void*)argument; \ -} while( 0 ) - -#define DEFER_POP() do { --numOfDeferedOperations; } while( 0 ) - -#define DEFER_EXECUTE() do{ \ - deferExecute( deferedOperations, numOfDeferedOperations ); \ - numOfDeferedOperations = 0; \ -} while( 0 ) - -#define DEFER_PUSH_VOID_PTR( func, arg ) DEFER_PUSH( deferWrapper_void_ptr, func, arg ) -#define DEFER_PUSH_INT_INT( func, arg ) DEFER_PUSH( deferWrapper_int_int, func, arg ) -#define DEFER_PUSH_VOID_VOID( func ) DEFER_PUSH( deferWrapper_void_void, func, 0 ) - -#define DEFER_PUSH_FREE( arg ) DEFER_PUSH( deferWrapper_void_ptr, free, arg ) -#define DEFER_PUSH_CLOSE( arg ) DEFER_PUSH( deferWrapper_int_int, close, arg ) +#define THROW( x ) exceptionThrow( (x) ) +#define CAUGHT_EXCEPTION() ((bool)(caughtEexception == 1)) #ifdef __cplusplus } diff --git a/src/util/src/exception.c b/src/util/src/exception.c index 45ebd349a5..b0e8fce371 100644 --- a/src/util/src/exception.c +++ b/src/util/src/exception.c @@ -8,7 +8,7 @@ void exceptionPushNode( SExceptionNode* node ) { expList = node; } -int exceptionPopNode() { +int32_t exceptionPopNode() { SExceptionNode* node = expList; expList = node->prev; return node->code; @@ -19,25 +19,105 @@ void exceptionThrow( int code ) { longjmp( expList->jb, 1 ); } -void deferWrapper_void_ptr( SDeferedOperation* dp ) { - void (*func)( void* ) = dp->func; - func( dp->arg ); + + +static void cleanupWrapper_void_ptr_ptr( SCleanupAction* ca ) { + void (*func)( void*, void* ) = ac->func; + func( ca->arg1.Ptr, ca->arg2.Ptr ); } -void deferWrapper_int_int( SDeferedOperation* dp ) { - int (*func)( int ) = dp->func; - func( (int)(intptr_t)(dp->arg) ); +static void cleanupWrapper_void_ptr_bool( SCleanupAction* ca ) { + void (*func)( void*, bool ) = ca->func; + func( ca->arg1.Ptr, ca->arg2.Bool ); } -void deferWrapper_void_void( SDeferedOperation* dp ) { - void (*func)() = dp->func; +static void cleanupWrapper_void_ptr( SCleanupAction* ca ) { + void (*func)( void* ) = ca->func; + func( ca->arg1.Ptr ); +} + +static void cleanupWrapper_int_int( SCleanupAction* ca ) { + int (*func)( int ) = ca->func; + func( (int)(intptr_t)(ca->arg1.Int) ); +} + +static void cleanupWrapper_void_void( SCleanupAction* ca ) { + void (*func)() = ca->func; func(); } -void deferExecute( SDeferedOperation* operations, unsigned int numOfOperations ) { - while( numOfOperations > 0 ) { - --numOfOperations; - SDeferedOperation* dp = operations + numOfOperations; - dp->wrapper( dp ); +static void (*wrappers)(SCleanupAction*)[] = { + cleanupWrapper_void_ptr_ptr, + cleanupWrapper_void_ptr_bool, + cleanupWrapper_void_ptr, + cleanupWrapper_int_int, + cleanupWrapper_void_void, +}; + + +void cleanupPush_void_ptr_ptr( bool failOnly, void* func, void* arg1, void* arg2 ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; + ac->wrapper = 0; + ac->failOnly = failOnly; + ac->func = func; + ac->arg1.Ptr = arg1; + ac->arg2.Ptr = arg2; +} + +void cleanupPush_void_ptr_bool( bool failOnly, void* func, void* arg1, bool arg2 ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; + ac->wrapper = 1; + ac->failOnly = failOnly; + ac->func = func; + ac->arg1.Ptr = arg1; + ac->arg2.Bool = arg2; +} + +void cleanupPush_void_ptr( bool failOnly, void* func, void* arg ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; + ac->wrapper = 2; + ac->failOnly = failOnly; + ac->func = func; + ac->arg1.Ptr = arg1; +} + +void cleanupPush_int_int( bool failOnly, void* func, int arg ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; + ac->wrapper = 3; + ac->failOnly = failOnly; + ac->func = func; + ac->arg1.Int = arg; +} + +void cleanupPush_void( bool failOnly, void* func ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; + ac->wrapper = 4; + ac->failOnly = failOnly; + ac->func = func; +} + + + +int32 cleanupGetActionCount() { + return expList->numCleanupAction; +} + + +void cleanupExecute( int32_t anchor, bool failed ) { + while( expList->numCleanupAction > anchor ) { + --expList->numCleanupAction; + SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction; + if( failed || !(ac->failOnly) ) + ac->wrapper( ac ); } } From 058c8912e303bf389aa21f6a60256d18e0a21a43 Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 27 Apr 2020 08:17:54 +0800 Subject: [PATCH 4/7] fix compile errors --- src/util/inc/exception.h | 3 +- src/util/src/exception.c | 65 ++++++++++++++++++++-------------------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h index 229ba89d04..32e2fcb61b 100644 --- a/src/util/inc/exception.h +++ b/src/util/inc/exception.h @@ -18,6 +18,7 @@ #include #include +#include #include #ifdef __cplusplus @@ -58,7 +59,7 @@ void cleanupPush_int_int ( bool failOnly, void* func, int arg ); void cleanupPush_void ( bool failOnly, void* func ); int32_t cleanupGetActionCount(); -void cleanupExecute( bool failed, int32_t toIndex ); +void cleanupExecute( int32_t anchor, bool failed ); #define CLEANUP_PUSH_VOID_PTR_PTR( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_ptr( (failOnly), (void*)(func), (void*)(arg1), (void*)(arg2) ) #define CLEANUP_PUSH_VOID_PTR_BOOL( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_bool( (failOnly), (void*)(func), (void*)(arg1), (bool)(arg2) ) diff --git a/src/util/src/exception.c b/src/util/src/exception.c index b0e8fce371..27cf6fbcd6 100644 --- a/src/util/src/exception.c +++ b/src/util/src/exception.c @@ -22,7 +22,7 @@ void exceptionThrow( int code ) { static void cleanupWrapper_void_ptr_ptr( SCleanupAction* ca ) { - void (*func)( void*, void* ) = ac->func; + void (*func)( void*, void* ) = ca->func; func( ca->arg1.Ptr, ca->arg2.Ptr ); } @@ -46,7 +46,8 @@ static void cleanupWrapper_void_void( SCleanupAction* ca ) { func(); } -static void (*wrappers)(SCleanupAction*)[] = { +typedef void (*wrapper)(SCleanupAction*); +static wrapper wrappers[] = { cleanupWrapper_void_ptr_ptr, cleanupWrapper_void_ptr_bool, cleanupWrapper_void_ptr, @@ -58,57 +59,57 @@ static void (*wrappers)(SCleanupAction*)[] = { void cleanupPush_void_ptr_ptr( bool failOnly, void* func, void* arg1, void* arg2 ) { assert( expList->numCleanupAction < expList->maxCleanupAction ); - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; - ac->wrapper = 0; - ac->failOnly = failOnly; - ac->func = func; - ac->arg1.Ptr = arg1; - ac->arg2.Ptr = arg2; + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 0; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg1; + ca->arg2.Ptr = arg2; } void cleanupPush_void_ptr_bool( bool failOnly, void* func, void* arg1, bool arg2 ) { assert( expList->numCleanupAction < expList->maxCleanupAction ); - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; - ac->wrapper = 1; - ac->failOnly = failOnly; - ac->func = func; - ac->arg1.Ptr = arg1; - ac->arg2.Bool = arg2; + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 1; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg1; + ca->arg2.Bool = arg2; } void cleanupPush_void_ptr( bool failOnly, void* func, void* arg ) { assert( expList->numCleanupAction < expList->maxCleanupAction ); - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; - ac->wrapper = 2; - ac->failOnly = failOnly; - ac->func = func; - ac->arg1.Ptr = arg1; + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 2; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg; } void cleanupPush_int_int( bool failOnly, void* func, int arg ) { assert( expList->numCleanupAction < expList->maxCleanupAction ); - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; - ac->wrapper = 3; - ac->failOnly = failOnly; - ac->func = func; - ac->arg1.Int = arg; + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 3; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Int = arg; } void cleanupPush_void( bool failOnly, void* func ) { assert( expList->numCleanupAction < expList->maxCleanupAction ); - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction++; - ac->wrapper = 4; - ac->failOnly = failOnly; - ac->func = func; + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 4; + ca->failOnly = failOnly; + ca->func = func; } -int32 cleanupGetActionCount() { +int32_t cleanupGetActionCount() { return expList->numCleanupAction; } @@ -116,8 +117,8 @@ int32 cleanupGetActionCount() { void cleanupExecute( int32_t anchor, bool failed ) { while( expList->numCleanupAction > anchor ) { --expList->numCleanupAction; - SCleanupAction *ac = expList->cleanupActions + expList->numCleanupAction; - if( failed || !(ac->failOnly) ) - ac->wrapper( ac ); + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction; + if( failed || !(ca->failOnly) ) + wrappers[ca->wrapper]( ca ); } } From 9fdf34552b8eb5ba111ec128c2777e44f30da24f Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 27 Apr 2020 09:16:56 +0800 Subject: [PATCH 5/7] split SBuffer to Reader & Writer --- src/util/inc/tbuffer.h | 230 +++++++------------ src/util/src/tbuffer.c | 499 +++++++++++++++++++---------------------- 2 files changed, 316 insertions(+), 413 deletions(-) diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 2d8ea732cc..103b3710cf 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -23,160 +23,102 @@ extern "C" { #endif -/* -// SBuffer can be used to read or write a buffer, but cannot be used for both -// read & write at a same time. Below is an example: -#include -#include -#include "exception.h" -#include "tbuffer.h" - -int foo() { - SBuffer wbuf, rbuf; - tbufSetup(&wbuf, NULL, false); - tbufSetup(&rbuf, NULL, false); - - TRY { - //--------------------- write ------------------------ - tbufBeginWrite(&wbuf); - // reserve 1024 bytes for the buffer to improve performance - tbufEnsureCapacity(&wbuf, 1024); - // write 5 integers to the buffer - for (int i = 0; i < 5; i++) { - tbufWriteInt32(&wbuf, i); - } - // write a string to the buffer - tbufWriteString(&wbuf, "this is a string.\n"); - // acquire the result and close the write buffer - size_t size = tbufTell(&wbuf); - char* data = tbufGetData(&wbuf, true); - - //------------------------ read ----------------------- - tbufBeginRead(&rbuf, data, size); - // read & print out 5 integers - for (int i = 0; i < 5; i++) { - printf("%d\n", tbufReadInt32(&rbuf)); - } - // read & print out a string - puts(tbufReadString(&rbuf, NULL)); - // try read another integer, this result in an error as there no this integer - tbufReadInt32(&rbuf); - printf("you should not see this message.\n"); - } CATCH( code ) { - printf("exception code is: %d, you will see this message after print out 5 integers and a string.\n", code); - THROW( code ); - } END_CATCH - - tbufClose(&wbuf, true); - tbufClose(&rbuf, false); - return 0; -} - -int main(int argc, char** argv) { - TRY { - printf("in main: you will see this line\n"); - foo(); - printf("in main: you will not see this line\n"); - } CATCH( code ) { - printf("foo raise an exception with code %d\n", code); - } END_CATCH - - return 0; -} -*/ +typedef struct { + bool endian; + const char* data; + size_t pos; + size_t size; +} SBufferReader; typedef struct { - void* (*allocator)(void*, size_t); - bool endian; - char* data; - size_t pos; - size_t size; -} SBuffer; + bool endian; + char* data; + size_t pos; + size_t size; + void* (*allocator)( void*, size_t ); +} SBufferWriter; -// common functions can be used in both read & write +//////////////////////////////////////////////////////////////////////////////// +// common functions & macros for both reader & writer +#define tbufTell( buf ) ((buf)->pos) -// tbufSetup setup the buffer, should be called before tbufBeginRead / tbufBeginWrite -// *allocator*, function to allocate memory, will use 'realloc' if NULL -// *endian*, if true, read/write functions of primitive types will do 'ntoh' or 'hton' automatically -void tbufSetup(SBuffer* buf, void* (*allocator)(void*, size_t), bool endian); -size_t tbufTell(SBuffer* buf); -size_t tbufSeekTo(SBuffer* buf, size_t pos); -void tbufClose(SBuffer* buf, bool keepData); -// basic read functions -void tbufBeginRead(SBuffer* buf, void* data, size_t len); -size_t tbufSkip(SBuffer* buf, size_t size); -char* tbufRead(SBuffer* buf, size_t size); -void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); -const char* tbufReadString(SBuffer* buf, size_t* len); -size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); -const char* tbufReadBinary(SBuffer* buf, size_t *len); -size_t tbufReadToBinary(SBuffer* buf, void* dst, size_t size); +//////////////////////////////////////////////////////////////////////////////// +// reader functions & macros -// basic write functions -void tbufBeginWrite(SBuffer* buf); -void tbufEnsureCapacity(SBuffer* buf, size_t size); -size_t tbufReserve(SBuffer* buf, size_t size); -char* tbufGetData(SBuffer* buf, bool takeOver); -void tbufWrite(SBuffer* buf, const void* data, size_t size); -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); -void tbufWriteString(SBuffer* buf, const char* str); -// the prototype of WriteBinary and Write is identical -// the difference is: WriteBinary writes the length of the data to the buffer +// *Endian*, if true, reader functions of primitive types will do 'ntoh' automatically +#define tbufInitReader( Data, Size, Endian ) {.endian = (Endian), .data = (Data), .pos = 0, .size = ((Data) == NULL ? 0 :(Size))} + +size_t tbufSkip( SBufferReader* buf, size_t size ); + +char* tbufRead( SBufferReader* buf, size_t size ); +void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ); +const char* tbufReadString( SBufferReader* buf, size_t* len ); +size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ); +const char* tbufReadBinary( SBufferReader* buf, size_t *len ); +size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size ); + +bool tbufReadBool( SBufferReader* buf ); +char tbufReadChar( SBufferReader* buf ); +int8_t tbufReadInt8( SBufferReader* buf ); +uint8_t tbufReadUint8( SBufferReader* buf ); +int16_t tbufReadInt16( SBufferReader* buf ); +uint16_t tbufReadUint16( SBufferReader* buf ); +int32_t tbufReadInt32( SBufferReader* buf ); +uint32_t tbufReadUint32( SBufferReader* buf ); +int64_t tbufReadInt64( SBufferReader* buf ); +uint64_t tbufReadUint64( SBufferReader* buf ); +float tbufReadFloat( SBufferReader* buf ); +double tbufReadDouble( SBufferReader* buf ); + + +//////////////////////////////////////////////////////////////////////////////// +// writer functions & macros + +// *Allocator*, function to allocate memory, will use 'realloc' if NULL +// *Endian*, if true, writer functions of primitive types will do 'hton' automatically +#define tbufInitWriter( Allocator, Endian ) {.endian = (Endian), .data = NULL, .pos = 0, .size = 0, .allocator = ((Allocator) == NULL ? realloc : (Allocator))} +void tbufCloseWriter( SBufferWriter* buf ); + +void tbufEnsureCapacity( SBufferWriter* buf, size_t size ); +size_t tbufReserve( SBufferWriter* buf, size_t size ); +char* tbufGetData( SBufferWriter* buf, bool takeOver ); + +void tbufWrite( SBufferWriter* buf, const void* data, size_t size ); +void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ); +void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ); +void tbufWriteString( SBufferWriter* buf, const char* str ); +// the prototype of tbufWriteBinary and tbufWrite are identical +// the difference is: tbufWriteBinary writes the length of the data to the buffer // first, then the actual data, which means the reader don't need to know data // size before read. Write only write the data itself, which means the reader // need to know data size before read. -void tbufWriteBinary(SBuffer* buf, const void* data, size_t len); +void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ); -// read / write functions for primitive types -bool tbufReadBool(SBuffer* buf); -void tbufWriteBool(SBuffer* buf, bool data); -void tbufWriteBoolAt(SBuffer* buf, size_t pos, bool data); - -char tbufReadChar(SBuffer* buf); -void tbufWriteChar(SBuffer* buf, char data); -void tbufWriteCharAt(SBuffer* buf, size_t pos, char data); - -int8_t tbufReadInt8(SBuffer* buf); -void tbufWriteInt8(SBuffer* buf, int8_t data); -void tbufWriteInt8At(SBuffer* buf, size_t pos, int8_t data); - -uint8_t tbufReadUint8(SBuffer* buf); -void tbufWriteUint8(SBuffer* buf, uint8_t data); -void tbufWriteUint8At(SBuffer* buf, size_t pos, uint8_t data); - -int16_t tbufReadInt16(SBuffer* buf); -void tbufWriteInt16(SBuffer* buf, int16_t data); -void tbufWriteInt16At(SBuffer* buf, size_t pos, int16_t data); - -uint16_t tbufReadUint16(SBuffer* buf); -void tbufWriteUint16(SBuffer* buf, uint16_t data); -void tbufWriteUint16At(SBuffer* buf, size_t pos, uint16_t data); - -int32_t tbufReadInt32(SBuffer* buf); -void tbufWriteInt32(SBuffer* buf, int32_t data); -void tbufWriteInt32At(SBuffer* buf, size_t pos, int32_t data); - -uint32_t tbufReadUint32(SBuffer* buf); -void tbufWriteUint32(SBuffer* buf, uint32_t data); -void tbufWriteUint32At(SBuffer* buf, size_t pos, uint32_t data); - -int64_t tbufReadInt64(SBuffer* buf); -void tbufWriteInt64(SBuffer* buf, int64_t data); -void tbufWriteInt64At(SBuffer* buf, size_t pos, int64_t data); - -uint64_t tbufReadUint64(SBuffer* buf); -void tbufWriteUint64(SBuffer* buf, uint64_t data); -void tbufWriteUint64At(SBuffer* buf, size_t pos, uint64_t data); - -float tbufReadFloat(SBuffer* buf); -void tbufWriteFloat(SBuffer* buf, float data); -void tbufWriteFloatAt(SBuffer* buf, size_t pos, float data); - -double tbufReadDouble(SBuffer* buf); -void tbufWriteDouble(SBuffer* buf, double data); -void tbufWriteDoubleAt(SBuffer* buf, size_t pos, double data); +void tbufWriteBool( SBufferWriter* buf, bool data ); +void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ); +void tbufWriteChar( SBufferWriter* buf, char data ); +void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ); +void tbufWriteInt8( SBufferWriter* buf, int8_t data ); +void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ); +void tbufWriteUint8( SBufferWriter* buf, uint8_t data ); +void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ); +void tbufWriteInt16( SBufferWriter* buf, int16_t data ); +void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ); +void tbufWriteUint16( SBufferWriter* buf, uint16_t data ); +void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ); +void tbufWriteInt32( SBufferWriter* buf, int32_t data ); +void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ); +void tbufWriteUint32( SBufferWriter* buf, uint32_t data ); +void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ); +void tbufWriteInt64( SBufferWriter* buf, int64_t data ); +void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ); +void tbufWriteUint64( SBufferWriter* buf, uint64_t data ); +void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ); +void tbufWriteFloat( SBufferWriter* buf, float data ); +void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ); +void tbufWriteDouble( SBufferWriter* buf, double data ); +void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ); #ifdef __cplusplus } diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index c254436a4e..b2ded0203e 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -22,137 +22,188 @@ #include //////////////////////////////////////////////////////////////////////////////// -// common functions +// reader functions -void tbufSetup( - SBuffer* buf, - void* (*allocator)(void*, size_t), - bool endian -) { - if (allocator != NULL) { - buf->allocator = allocator; - } else { - buf->allocator = realloc; - } - - buf->endian = endian; -} - -size_t tbufTell(SBuffer* buf) { - return buf->pos; -} - -size_t tbufSeekTo(SBuffer* buf, size_t pos) { - if (pos > buf->size) { +size_t tbufSkip(SBufferReader* buf, size_t size) { + if( (buf->pos + size) > buf->size ) { THROW( TSDB_CODE_MEMORY_CORRUPTED ); } size_t old = buf->pos; - buf->pos = pos; + buf->pos += size; return old; } -void tbufClose(SBuffer* buf, bool keepData) { - if (!keepData) { - (*buf->allocator)(buf->data, 0); - } - buf->data = NULL; - buf->pos = 0; - buf->size = 0; -} - -//////////////////////////////////////////////////////////////////////////////// -// read functions - -void tbufBeginRead(SBuffer* buf, void* data, size_t len) { - buf->data = data; - buf->pos = 0; - buf->size = (data == NULL) ? 0 : len; -} - -size_t tbufSkip(SBuffer* buf, size_t size) { - return tbufSeekTo(buf, buf->pos + size); -} - -char* tbufRead(SBuffer* buf, size_t size) { +char* tbufRead( SBufferReader* buf, size_t size ) { char* ret = buf->data + buf->pos; - tbufSkip(buf, size); + tbufSkip( buf, size ); return ret; } -void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size) { - assert(dst != NULL); +void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ) { + assert( dst != NULL ); // always using memcpy, leave optimization to compiler - memcpy(dst, tbufRead(buf, size), size); + memcpy( dst, tbufRead(buf, size), size ); } -static size_t tbufReadLength(SBuffer* buf) { +static size_t tbufReadLength( SBufferReader* buf ) { // maximum length is 65535, if larger length is required // this function and the corresponding write function need to be // revised. - uint16_t l = tbufReadUint16(buf); + uint16_t l = tbufReadUint16( buf ); return l; } -const char* tbufReadString(SBuffer* buf, size_t* len) { - size_t l = tbufReadLength(buf); - char* ret = buf->data + buf->pos; - tbufSkip(buf, l + 1); - ret[l] = 0; // ensure the string end with '\0' - if (len != NULL) { +const char* tbufReadString( SBufferReader* buf, size_t* len ) { + size_t l = tbufReadLength( buf ); + char* ret = buf->data + buf->pos; + tbufSkip( buf, l + 1 ); + if( ret[l] != 0 ) { + THROW( TSDB_CODE_MEMORY_CORRUPTED ); + } + if( len != NULL ) { *len = l; } return ret; } -size_t tbufReadToString(SBuffer* buf, char* dst, size_t size) { - assert(dst != NULL); - size_t len; - const char* str = tbufReadString(buf, &len); +size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ) { + assert( dst != NULL ); + size_t len; + const char* str = tbufReadString( buf, &len ); if (len >= size) { len = size - 1; } - memcpy(dst, str, len); + memcpy( dst, str, len ); dst[len] = 0; return len; } -const char* tbufReadBinary(SBuffer* buf, size_t *len) { - size_t l = tbufReadLength(buf); +const char* tbufReadBinary( SBufferReader* buf, size_t *len ) { + size_t l = tbufReadLength( buf ); char* ret = buf->data + buf->pos; - tbufSkip(buf, l); - if (len != NULL) { + tbufSkip( buf, l ); + if( len != NULL ) { *len = l; } return ret; } -size_t tbufReadToBinary(SBuffer* buf, void* dst, size_t size) { - assert(dst != NULL); - size_t len; - const char* data = tbufReadBinary(buf, &len); - if (len >= size) { +size_t tbufReadToBinary( SBufferReader* buf, void* dst, size_t size ) { + assert( dst != NULL ); + size_t len; + const char* data = tbufReadBinary( buf, &len ); + if( len >= size ) { len = size; } - memcpy(dst, data, len); + memcpy( dst, data, len ); return len; } -//////////////////////////////////////////////////////////////////////////////// -// write functions +bool tbufReadBool( SBufferReader* buf ) { + bool ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} -void tbufBeginWrite(SBuffer* buf) { +char tbufReadChar( SBufferReader* buf ) { + char ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} + +int8_t tbufReadInt8( SBufferReader* buf ) { + int8_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} + +uint8_t tbufReadUint8( SBufferReader* buf ) { + uint8_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + return ret; +} + +int16_t tbufReadInt16( SBufferReader* buf ) { + int16_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int16_t)ntohs( ret ); + } + return ret; +} + +uint16_t tbufReadUint16( SBufferReader* buf ) { + uint16_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return ntohs( ret ); + } + return ret; +} + +int32_t tbufReadInt32( SBufferReader* buf ) { + int32_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int32_t)ntohl( ret ); + } + return ret; +} + +uint32_t tbufReadUint32( SBufferReader* buf ) { + uint32_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return ntohl( ret ); + } + return ret; +} + +int64_t tbufReadInt64( SBufferReader* buf ) { + int64_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return (int64_t)htobe64( ret ); // TODO: ntohll + } + return ret; +} + +uint64_t tbufReadUint64( SBufferReader* buf ) { + uint64_t ret; + tbufReadToBuffer( buf, &ret, sizeof(ret) ); + if( buf->endian ) { + return htobe64( ret ); // TODO: ntohll + } + return ret; +} + +float tbufReadFloat( SBufferReader* buf ) { + uint32_t ret = tbufReadUint32( buf ); + return *(float*)( &ret ); +} + +double tbufReadDouble(SBufferReader* buf) { + uint64_t ret = tbufReadUint64( buf ); + return *(double*)( &ret ); +} + +//////////////////////////////////////////////////////////////////////////////// +// writer functions + +void tbufCloseWriter( SBufferWriter* buf ) { + (*buf->allocator)( buf->data, 0 ); buf->data = NULL; buf->pos = 0; buf->size = 0; } -void tbufEnsureCapacity(SBuffer* buf, size_t size) { +void tbufEnsureCapacity( SBufferWriter* buf, size_t size ) { size += buf->pos; - if (size > buf->size) { + if( size > buf->size ) { size_t nsize = size + buf->size; - char* data = (*buf->allocator)(buf->data, nsize); - if (data == NULL) { - // TODO: handle client out of memory + char* data = (*buf->allocator)( buf->data, nsize ); + // TODO: the exception should be thrown by the allocator function + if( data == NULL ) { THROW( TSDB_CODE_SERV_OUT_OF_MEMORY ); } buf->data = data; @@ -160,279 +211,189 @@ void tbufEnsureCapacity(SBuffer* buf, size_t size) { } } -size_t tbufReserve(SBuffer* buf, size_t size) { - tbufEnsureCapacity(buf, size); - return tbufSeekTo(buf, buf->pos + size); +size_t tbufReserve( SBufferWriter* buf, size_t size ) { + tbufEnsureCapacity( buf, size ); + size_t old = buf->pos; + buf->pos += size; + return old; } -char* tbufGetData(SBuffer* buf, bool takeOver) { +char* tbufGetData( SBufferWriter* buf, bool takeOver ) { char* ret = buf->data; - if (takeOver) { + if( takeOver ) { buf->pos = 0; buf->size = 0; buf->data = NULL; } - return ret; } -void tbufWrite(SBuffer* buf, const void* data, size_t size) { - assert(data != NULL); - tbufEnsureCapacity(buf, size); - memcpy(buf->data + buf->pos, data, size); +void tbufWrite( SBufferWriter* buf, const void* data, size_t size ) { + assert( data != NULL ); + tbufEnsureCapacity( buf, size ); + memcpy( buf->data + buf->pos, data, size ); buf->pos += size; } -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size) { - assert(data != NULL); +void tbufWriteAt( SBufferWriter* buf, size_t pos, const void* data, size_t size ) { + assert( data != NULL ); // this function can only be called to fill the gap on previous writes, // so 'pos + size <= buf->pos' must be true - assert(pos + size <= buf->pos); - memcpy(buf->data + pos, data, size); + assert( pos + size <= buf->pos ); + memcpy( buf->data + pos, data, size ); } -static void tbufWriteLength(SBuffer* buf, size_t len) { +static void tbufWriteLength( SBufferWriter* buf, size_t len ) { // maximum length is 65535, if larger length is required // this function and the corresponding read function need to be // revised. - assert(len <= 0xffff); - tbufWriteUint16(buf, (uint16_t)len); + assert( len <= 0xffff ); + tbufWriteUint16( buf, (uint16_t)len ); } -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len) { - tbufWriteLength(buf, len); - tbufWrite(buf, str, len); - tbufWriteChar(buf, '\0'); +void tbufWriteStringLen( SBufferWriter* buf, const char* str, size_t len ) { + tbufWriteLength( buf, len ); + tbufWrite( buf, str, len ); + tbufWriteChar( buf, '\0' ); } -void tbufWriteString(SBuffer* buf, const char* str) { - tbufWriteStringLen(buf, str, strlen(str)); +void tbufWriteString( SBufferWriter* buf, const char* str ) { + tbufWriteStringLen( buf, str, strlen(str) ); } -void tbufWriteBinary(SBuffer* buf, const void* data, size_t len) { - tbufWriteLength(buf, len); - tbufWrite(buf, data, len); +void tbufWriteBinary( SBufferWriter* buf, const void* data, size_t len ) { + tbufWriteLength( buf, len ); + tbufWrite( buf, data, len ); } -//////////////////////////////////////////////////////////////////////////////// -// read / write functions for primitive types - -bool tbufReadBool(SBuffer* buf) { - bool ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - return ret; +void tbufWriteBool( SBufferWriter* buf, bool data ) { + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteBool(SBuffer* buf, bool data) { - tbufWrite(buf, &data, sizeof(data)); +void tbufWriteBoolAt( SBufferWriter* buf, size_t pos, bool data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteBoolAt(SBuffer* buf, size_t pos, bool data) { - tbufWriteAt(buf, pos, &data, sizeof(data)); +void tbufWriteChar( SBufferWriter* buf, char data ) { + tbufWrite( buf, &data, sizeof(data) ); } -char tbufReadChar(SBuffer* buf) { - char ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - return ret; +void tbufWriteCharAt( SBufferWriter* buf, size_t pos, char data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteChar(SBuffer* buf, char data) { - tbufWrite(buf, &data, sizeof(data)); +void tbufWriteInt8( SBufferWriter* buf, int8_t data ) { + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteCharAt(SBuffer* buf, size_t pos, char data) { - tbufWriteAt(buf, pos, &data, sizeof(data)); +void tbufWriteInt8At( SBufferWriter* buf, size_t pos, int8_t data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -int8_t tbufReadInt8(SBuffer* buf) { - int8_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - return ret; +void tbufWriteUint8( SBufferWriter* buf, uint8_t data ) { + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteInt8(SBuffer* buf, int8_t data) { - tbufWrite(buf, &data, sizeof(data)); +void tbufWriteUint8At( SBufferWriter* buf, size_t pos, uint8_t data ) { + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteInt8At(SBuffer* buf, size_t pos, int8_t data) { - tbufWriteAt(buf, pos, &data, sizeof(data)); -} - -uint8_t tbufReadUint8(SBuffer* buf) { - uint8_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - return ret; -} - -void tbufWriteUint8(SBuffer* buf, uint8_t data) { - tbufWrite(buf, &data, sizeof(data)); -} - -void tbufWriteUint8At(SBuffer* buf, size_t pos, uint8_t data) { - tbufWriteAt(buf, pos, &data, sizeof(data)); -} - -int16_t tbufReadInt16(SBuffer* buf) { - int16_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return (int16_t)ntohs(ret); +void tbufWriteInt16( SBufferWriter* buf, int16_t data ) { + if( buf->endian ) { + data = (int16_t)htons( data ); } - return ret; + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteInt16(SBuffer* buf, int16_t data) { - if (buf->endian) { - data = (int16_t)htons(data); +void tbufWriteInt16At( SBufferWriter* buf, size_t pos, int16_t data ) { + if( buf->endian ) { + data = (int16_t)htons( data ); } - tbufWrite(buf, &data, sizeof(data)); + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteInt16At(SBuffer* buf, size_t pos, int16_t data) { - if (buf->endian) { - data = (int16_t)htons(data); +void tbufWriteUint16( SBufferWriter* buf, uint16_t data ) { + if( buf->endian ) { + data = htons( data ); } - tbufWriteAt(buf, pos, &data, sizeof(data)); + tbufWrite( buf, &data, sizeof(data) ); } -uint16_t tbufReadUint16(SBuffer* buf) { - uint16_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return ntohs(ret); +void tbufWriteUint16At( SBufferWriter* buf, size_t pos, uint16_t data ) { + if( buf->endian ) { + data = htons( data ); } - return ret; + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteUint16(SBuffer* buf, uint16_t data) { - if (buf->endian) { - data = htons(data); +void tbufWriteInt32( SBufferWriter* buf, int32_t data ) { + if( buf->endian ) { + data = (int32_t)htonl( data ); } - tbufWrite(buf, &data, sizeof(data)); + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteUint16At(SBuffer* buf, size_t pos, uint16_t data) { - if (buf->endian) { - data = htons(data); +void tbufWriteInt32At( SBufferWriter* buf, size_t pos, int32_t data ) { + if( buf->endian ) { + data = (int32_t)htonl( data ); } - tbufWriteAt(buf, pos, &data, sizeof(data)); + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -int32_t tbufReadInt32(SBuffer* buf) { - int32_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return (int32_t)ntohl(ret); +void tbufWriteUint32( SBufferWriter* buf, uint32_t data ) { + if( buf->endian ) { + data = htonl( data ); } - return ret; + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteInt32(SBuffer* buf, int32_t data) { - if (buf->endian) { - data = (int32_t)htonl(data); +void tbufWriteUint32At( SBufferWriter* buf, size_t pos, uint32_t data ) { + if( buf->endian ) { + data = htonl( data ); } - tbufWrite(buf, &data, sizeof(data)); + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteInt32At(SBuffer* buf, size_t pos, int32_t data) { - if (buf->endian) { - data = (int32_t)htonl(data); +void tbufWriteInt64( SBufferWriter* buf, int64_t data ) { + if( buf->endian ) { + data = (int64_t)htobe64( data ); } - tbufWriteAt(buf, pos, &data, sizeof(data)); + tbufWrite( buf, &data, sizeof(data) ); } -uint32_t tbufReadUint32(SBuffer* buf) { - uint32_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return ntohl(ret); +void tbufWriteInt64At( SBufferWriter* buf, size_t pos, int64_t data ) { + if( buf->endian ) { + data = (int64_t)htobe64( data ); } - return ret; + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -void tbufWriteUint32(SBuffer* buf, uint32_t data) { - if (buf->endian) { - data = htonl(data); +void tbufWriteUint64( SBufferWriter* buf, uint64_t data ) { + if( buf->endian ) { + data = htobe64( data ); } - tbufWrite(buf, &data, sizeof(data)); + tbufWrite( buf, &data, sizeof(data) ); } -void tbufWriteUint32At(SBuffer* buf, size_t pos, uint32_t data) { - if (buf->endian) { - data = htonl(data); +void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ) { + if( buf->endian ) { + data = htobe64( data ); } - tbufWriteAt(buf, pos, &data, sizeof(data)); + tbufWriteAt( buf, pos, &data, sizeof(data) ); } -int64_t tbufReadInt64(SBuffer* buf) { - int64_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return (int64_t)htobe64(ret); // TODO: ntohll - } - return ret; +void tbufWriteFloat( SBufferWriter* buf, float data ) { + tbufWriteUint32( buf, *(uint32_t*)(&data) ); } -void tbufWriteInt64(SBuffer* buf, int64_t data) { - if (buf->endian) { - data = (int64_t)htobe64(data); - } - tbufWrite(buf, &data, sizeof(data)); +void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ) { + tbufWriteUint32At( buf, pos, *(uint32_t*)(&data) ); } -void tbufWriteInt64At(SBuffer* buf, size_t pos, int64_t data) { - if (buf->endian) { - data = (int64_t)htobe64(data); - } - tbufWriteAt(buf, pos, &data, sizeof(data)); +void tbufWriteDouble( SBufferWriter* buf, double data ) { + tbufWriteUint64( buf, *(uint64_t*)(&data) ); } -uint64_t tbufReadUint64(SBuffer* buf) { - uint64_t ret; - tbufReadToBuffer(buf, &ret, sizeof(ret)); - if (buf->endian) { - return htobe64(ret); // TODO: ntohll - } - return ret; -} - -void tbufWriteUint64(SBuffer* buf, uint64_t data) { - if (buf->endian) { - data = htobe64(data); - } - tbufWrite(buf, &data, sizeof(data)); -} - -void tbufWriteUint64At(SBuffer* buf, size_t pos, uint64_t data) { - if (buf->endian) { - data = htobe64(data); - } - tbufWriteAt(buf, pos, &data, sizeof(data)); -} - -float tbufReadFloat(SBuffer* buf) { - uint32_t ret = tbufReadUint32(buf); - return *(float*)(&ret); -} - -void tbufWriteFloat(SBuffer* buf, float data) { - tbufWriteUint32(buf, *(uint32_t*)(&data)); -} - -void tbufWriteFloatAt(SBuffer* buf, size_t pos, float data) { - tbufWriteUint32At(buf, pos, *(uint32_t*)(&data)); -} - -double tbufReadDouble(SBuffer* buf) { - uint64_t ret = tbufReadUint64(buf); - return *(double*)(&ret); -} - -void tbufWriteDouble(SBuffer* buf, double data) { - tbufWriteUint64(buf, *(uint64_t*)(&data)); -} - -void tbufWriteDoubleAt(SBuffer* buf, size_t pos, double data) { - tbufWriteUint64At(buf, pos, *(uint64_t*)(&data)); +void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ) { + tbufWriteUint64At( buf, pos, *(uint64_t*)(&data) ); } From 241f7a2239482f74cf7f00ba668b16192976eb25 Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 27 Apr 2020 10:23:07 +0800 Subject: [PATCH 6/7] TD-153: fix bugs --- src/util/inc/exception.h | 57 +++++++++++++++++++++++----------------- src/util/inc/tbuffer.h | 54 ++++++++++++++++++++++++++++++++++++- src/util/src/exception.c | 16 ++++++++--- src/util/src/tbuffer.c | 8 +++--- 4 files changed, 102 insertions(+), 33 deletions(-) diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h index 32e2fcb61b..41f01d68dd 100644 --- a/src/util/inc/exception.h +++ b/src/util/inc/exception.h @@ -52,25 +52,6 @@ typedef struct SCleanupAction { } arg1, arg2; } SCleanupAction; -void cleanupPush_void_ptr_ptr ( bool failOnly, void* func, void* arg1, void* arg2 ); -void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool arg2 ); -void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); -void cleanupPush_int_int ( bool failOnly, void* func, int arg ); -void cleanupPush_void ( bool failOnly, void* func ); - -int32_t cleanupGetActionCount(); -void cleanupExecute( int32_t anchor, bool failed ); - -#define CLEANUP_PUSH_VOID_PTR_PTR( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_ptr( (failOnly), (void*)(func), (void*)(arg1), (void*)(arg2) ) -#define CLEANUP_PUSH_VOID_PTR_BOOL( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_bool( (failOnly), (void*)(func), (void*)(arg1), (bool)(arg2) ) -#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) -#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) -#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) -#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) -#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) - -#define CLEANUP_CREATE_ANCHOR() int32_t cleanupAnchor = cleanupGetActionCount() -#define CLEANUP_EXECUTE( failed ) cleanupExecute( cleanupAnchor, (failed) ) /* * exception hander registration @@ -84,29 +65,57 @@ typedef struct SExceptionNode { SCleanupAction* cleanupActions; } SExceptionNode; +//////////////////////////////////////////////////////////////////////////////// +// functions & macros for auto-cleanup + +void cleanupPush_void_ptr_ptr ( bool failOnly, void* func, void* arg1, void* arg2 ); +void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool arg2 ); +void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); +void cleanupPush_int_int ( bool failOnly, void* func, int arg ); +void cleanupPush_void ( bool failOnly, void* func ); + +int32_t cleanupGetActionCount(); +void cleanupExecuteTo( int32_t anchor, bool failed ); +void cleanupExecute( SExceptionNode* node, bool failed ); + +#define CLEANUP_PUSH_VOID_PTR_PTR( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_ptr( (failOnly), (void*)(func), (void*)(arg1), (void*)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR_BOOL( failOnly, func, arg1, arg2 ) cleanupPush_void_ptr_bool( (failOnly), (void*)(func), (void*)(arg1), (bool)(arg2) ) +#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) +#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) +#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) +#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) +#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) + +#define CLEANUP_GET_ANCHOR() cleanupGetActionCount() +#define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) ) + + +//////////////////////////////////////////////////////////////////////////////// +// functions & macros for exception handling + void exceptionPushNode( SExceptionNode* node ); int32_t exceptionPopNode(); void exceptionThrow( int code ); #define TRY(maxCleanupActions) do { \ SExceptionNode exceptionNode = { 0 }; \ - SDeferedOperation cleanupActions[maxCleanupActions > 0 ? maxCleanupActions : 1]; \ - exceptionNode.maxCleanupAction = maxCleanupActions > 0 ? maxDefered : 1; \ + SCleanupAction cleanupActions[(maxCleanupActions) > 0 ? (maxCleanupActions) : 1]; \ + exceptionNode.maxCleanupAction = (maxCleanupActions) > 0 ? (maxCleanupActions) : 1; \ exceptionNode.cleanupActions = cleanupActions; \ - int32_t cleanupAnchor = 0; \ exceptionPushNode( &exceptionNode ); \ int caughtException = setjmp( exceptionNode.jb ); \ if( caughtException == 0 ) #define CATCH( code ) int code = exceptionPopNode(); \ - if( caughtEexception == 1 ) + if( caughtException == 1 ) #define FINALLY( code ) int code = exceptionPopNode(); #define END_TRY } while( 0 ); #define THROW( x ) exceptionThrow( (x) ) -#define CAUGHT_EXCEPTION() ((bool)(caughtEexception == 1)) +#define CAUGHT_EXCEPTION() ((bool)(caughtException == 1)) +#define CLEANUP_EXECUTE() cleanupExecute( &exceptionNode, CAUGHT_EXCEPTION() ) #ifdef __cplusplus } diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 103b3710cf..8f3f7f777e 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -23,6 +23,58 @@ extern "C" { #endif +//////////////////////////////////////////////////////////////////////////////// +// usage example +/* +#include +#include "exception.h" + +int main( int argc, char** argv ) { + SBufferWriter bw = tbufInitWriter( NULL, false ); + + TRY( 1 ) { + //--------------------- write ------------------------ + // reserve 1024 bytes for the buffer to improve performance + tbufEnsureCapacity( &bw, 1024 ); + + // reserve space for the interger count + size_t pos = tbufReserve( &bw, sizeof(int32_t) ); + // write 5 integers to the buffer + for( int i = 0; i < 5; i++) { + tbufWriteInt32( &bw, i ); + } + // write the integer count to buffer at reserved position + tbufWriteInt32At( &bw, pos, 5 ); + + // write a string to the buffer + tbufWriteString( &bw, "this is a string.\n" ); + // acquire the result and close the write buffer + size_t size = tbufTell( &bw ); + char* data = tbufGetData( &bw, false ); + + //------------------------ read ----------------------- + SBufferReader br = tbufInitReader( data, size, false ); + // read & print out all integers + int32_t count = tbufReadInt32( &br ); + for( int i = 0; i < count; i++ ) { + printf( "%d\n", tbufReadInt32(&br) ); + } + // read & print out a string + puts( tbufReadString(&br, NULL) ); + // try read another integer, this result in an error as there no this integer + tbufReadInt32( &br ); + printf( "you should not see this message.\n" ); + } CATCH( code ) { + printf( "exception code is: %d, you will see this message after print out 5 integers and a string.\n", code ); + // throw it again and the exception will be caught in main + THROW( code ); + } END_TRY + + tbufCloseWriter( &bw ); + return 0; +} +*/ + typedef struct { bool endian; const char* data; @@ -51,7 +103,7 @@ typedef struct { size_t tbufSkip( SBufferReader* buf, size_t size ); -char* tbufRead( SBufferReader* buf, size_t size ); +const char* tbufRead( SBufferReader* buf, size_t size ); void tbufReadToBuffer( SBufferReader* buf, void* dst, size_t size ); const char* tbufReadString( SBufferReader* buf, size_t* len ); size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ); diff --git a/src/util/src/exception.c b/src/util/src/exception.c index 27cf6fbcd6..7f8f91c784 100644 --- a/src/util/src/exception.c +++ b/src/util/src/exception.c @@ -114,11 +114,19 @@ int32_t cleanupGetActionCount() { } -void cleanupExecute( int32_t anchor, bool failed ) { - while( expList->numCleanupAction > anchor ) { - --expList->numCleanupAction; - SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction; +static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed ) { + while( node->numCleanupAction > anchor ) { + --node->numCleanupAction; + SCleanupAction *ca = node->cleanupActions + node->numCleanupAction; if( failed || !(ca->failOnly) ) wrappers[ca->wrapper]( ca ); } } + +void cleanupExecuteTo( int32_t anchor, bool failed ) { + doExecuteCleanup( expList, anchor, failed ); +} + +void cleanupExecute( SExceptionNode* node, bool failed ) { + doExecuteCleanup( node, 0, failed ); +} \ No newline at end of file diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index b2ded0203e..3b4cc74cc3 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -33,8 +33,8 @@ size_t tbufSkip(SBufferReader* buf, size_t size) { return old; } -char* tbufRead( SBufferReader* buf, size_t size ) { - char* ret = buf->data + buf->pos; +const char* tbufRead( SBufferReader* buf, size_t size ) { + const char* ret = buf->data + buf->pos; tbufSkip( buf, size ); return ret; } @@ -55,7 +55,7 @@ static size_t tbufReadLength( SBufferReader* buf ) { const char* tbufReadString( SBufferReader* buf, size_t* len ) { size_t l = tbufReadLength( buf ); - char* ret = buf->data + buf->pos; + const char* ret = buf->data + buf->pos; tbufSkip( buf, l + 1 ); if( ret[l] != 0 ) { THROW( TSDB_CODE_MEMORY_CORRUPTED ); @@ -80,7 +80,7 @@ size_t tbufReadToString( SBufferReader* buf, char* dst, size_t size ) { const char* tbufReadBinary( SBufferReader* buf, size_t *len ) { size_t l = tbufReadLength( buf ); - char* ret = buf->data + buf->pos; + const char* ret = buf->data + buf->pos; tbufSkip( buf, l ); if( len != NULL ) { *len = l; From 036695d45955d6d7f0a6112f613d40ac518db241 Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 27 Apr 2020 14:03:02 +0800 Subject: [PATCH 7/7] TD-153: make whole project compile --- src/client/inc/tscUtil.h | 3 +- src/client/src/tscSQLParser.c | 26 ++++- src/client/src/tscUtil.c | 8 +- src/query/inc/qast.h | 5 +- src/query/src/qast.c | 184 +++++++++++++++++----------------- src/query/tests/astTest.cpp | 18 ++-- src/tsdb/src/tsdbRead.c | 37 +++++-- src/util/inc/tbuffer.h | 3 +- 8 files changed, 157 insertions(+), 127 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index d46c32d73d..718dfcf475 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -25,6 +25,7 @@ extern "C" { */ #include "os.h" #include "tbuffer.h" +#include "exception.h" #include "qextbuffer.h" #include "taosdef.h" #include "tscSecondaryMerge.h" @@ -177,7 +178,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid); -void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf); +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 6e16606695..5590ac5a01 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1185,10 +1185,18 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause"); } - SBuffer buf = exprTreeToBinary(pNode); + SBufferWriter bw = tbufInitWriter(NULL, false); + + TRY(0) { + exprTreeToBinary(&bw, pNode); + } CATCH(code) { + tbufCloseWriter(&bw); + UNUSED(code); + // TODO: other error handling + } END_TRY - size_t len = tbufTell(&buf); - char* c = tbufGetData(&buf, true); + size_t len = tbufTell(&bw); + char* c = tbufGetData(&bw, true); // set the serialized binary string as the parameter of arithmetic expression addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, len, index.tableIndex); @@ -3751,7 +3759,15 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, SArray* colList = taosArrayInit(10, sizeof(SColIndex)); ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, colList); - SBuffer buf = exprTreeToBinary(p); + SBufferWriter bw = tbufInitWriter(NULL, false); + + TRY(0) { + exprTreeToBinary(&bw, p); + } CATCH(code) { + tbufCloseWriter(&bw); + UNUSED(code); + // TODO: more error handling + } END_TRY // add to source column list STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); @@ -3765,7 +3781,7 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr, addRequiredTagColumn(pTableMetaInfo, &index); } - tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf); + tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &bw); doCompactQueryExpr(pExpr); tSQLExprDestroy(p1); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6b8b2b38b4..88ce13e560 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -47,18 +47,18 @@ SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { return NULL; } -void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf) { - if (tbufTell(pBuf) == 0) { +void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) { + if (tbufTell(bw) == 0) { return; } SCond cond = { .uid = uid, - .len = tbufTell(pBuf), + .len = tbufTell(bw), .cond = NULL, }; - cond.cond = tbufGetData(pBuf, true); + cond.cond = tbufGetData(bw, true); if (pTagCond->pCond == NULL) { pTagCond->pCond = taosArrayInit(3, sizeof(SCond)); diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 903d54a18f..6c997d5a36 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -90,9 +90,10 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res); uint8_t getBinaryExprOptr(SSQLToken *pToken); -SBuffer exprTreeToBinary(tExprNode* pExprTree); +void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); +void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); -tExprNode* exprTreeFromBinary(const void* pBuf, size_t size); +tExprNode* exprTreeFromBinary(const void* data, size_t size); tExprNode* exprTreeFromTableName(const char* tbnameCond); #ifdef __cplusplus diff --git a/src/query/src/qast.c b/src/query/src/qast.c index fdcbeeeac0..500a5f1e49 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -31,6 +31,7 @@ #include "tskiplist.h" #include "queryLog.h" #include "tsdbMain.h" +#include "exception.h" /* * @@ -44,7 +45,6 @@ * */ static tExprNode *tExprNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken); -static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i); static void destroySyntaxTree(tExprNode *); @@ -428,7 +428,7 @@ void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len) { static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); } -static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { +void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { if (pNode == NULL) { return; } @@ -1023,104 +1023,116 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) { } } -static void exprTreeToBinaryImpl(tExprNode* pExprTree, SBuffer* pBuf) { - tbufWrite(pBuf, &pExprTree->nodeType, sizeof(pExprTree->nodeType)); +static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) { + tbufWriteUint8(bw, expr->nodeType); - if (pExprTree->nodeType == TSQL_NODE_VALUE) { - tVariant* pVal = pExprTree->pVal; + if (expr->nodeType == TSQL_NODE_VALUE) { + tVariant* pVal = expr->pVal; - tbufWrite(pBuf, &pVal->nType, sizeof(pVal->nType)); + tbufWriteUint32(bw, pVal->nType); if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufWrite(pBuf, &pVal->nLen, sizeof(pVal->nLen)); - tbufWrite(pBuf, pVal->pz, pVal->nLen); + tbufWriteInt32(bw, pVal->nLen); + tbufWrite(bw, pVal->pz, pVal->nLen); } else { - tbufWrite(pBuf, &pVal->pz, sizeof(pVal->i64Key)); + tbufWriteInt64(bw, pVal->i64Key); } - } else if (pExprTree->nodeType == TSQL_NODE_COL) { - SSchema* pSchema = pExprTree->pSchema; - tbufWrite(pBuf, &pSchema->colId, sizeof(pSchema->colId)); - tbufWrite(pBuf, &pSchema->bytes, sizeof(pSchema->bytes)); - tbufWrite(pBuf, &pSchema->type, sizeof(pSchema->type)); + } else if (expr->nodeType == TSQL_NODE_COL) { + SSchema* pSchema = expr->pSchema; + tbufWriteInt16(bw, pSchema->colId); + tbufWriteInt16(bw, pSchema->bytes); + tbufWriteUint8(bw, pSchema->type); + tbufWriteString(bw, pSchema->name); - int32_t len = strlen(pSchema->name); - tbufWriteStringLen(pBuf, pSchema->name, len); - - } else if (pExprTree->nodeType == TSQL_NODE_EXPR) { - tbufWrite(pBuf, &pExprTree->_node.optr, sizeof(pExprTree->_node.optr)); - tbufWrite(pBuf, &pExprTree->_node.hasPK, sizeof(pExprTree->_node.hasPK)); - - exprTreeToBinaryImpl(pExprTree->_node.pLeft, pBuf); - exprTreeToBinaryImpl(pExprTree->_node.pRight, pBuf); + } else if (expr->nodeType == TSQL_NODE_EXPR) { + tbufWriteUint8(bw, expr->_node.optr); + tbufWriteUint8(bw, expr->_node.hasPK); + exprTreeToBinaryImpl(bw, expr->_node.pLeft); + exprTreeToBinaryImpl(bw, expr->_node.pRight); } } -SBuffer exprTreeToBinary(tExprNode* pExprTree) { - SBuffer buf = {0}; - if (pExprTree == NULL) { - return buf; +void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) { + if (expr != NULL) { + exprTreeToBinaryImpl(bw, expr); } - - int32_t code = tbufBeginWrite(&buf); - if (code != 0) { - return buf; - } - - exprTreeToBinaryImpl(pExprTree, &buf); - return buf; } -static tExprNode* exprTreeFromBinaryImpl(SBuffer* pBuf) { - tExprNode* pExpr = calloc(1, sizeof(tExprNode)); - pExpr->nodeType = tbufReadUint8(pBuf); +// TODO: these three functions should be made global +static void* exception_calloc(size_t nmemb, size_t size) { + void* p = calloc(nmemb, size); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); + } + return p; +} + +static void* exception_malloc(size_t size) { + void* p = malloc(size); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); + } + return p; +} + +static char* exception_strdup(const char* str) { + char* p = strdup(str); + if (p == NULL) { + THROW(TSDB_CODE_SERV_OUT_OF_MEMORY); + } + return p; +} + + +static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) { + int32_t anchor = CLEANUP_GET_ANCHOR(); + + tExprNode* pExpr = exception_calloc(1, sizeof(tExprNode)); + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, pExpr, NULL); + + pExpr->nodeType = tbufReadUint8(br); if (pExpr->nodeType == TSQL_NODE_VALUE) { - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); pExpr->pVal = pVal; - pVal->nType = tbufReadUint32(pBuf); + pVal->nType = tbufReadUint32(br); if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufReadToBuffer(pBuf, &pVal->nLen, sizeof(pVal->nLen)); + tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen)); pVal->pz = calloc(1, pVal->nLen + 1); - tbufReadToBuffer(pBuf, pVal->pz, pVal->nLen); + tbufReadToBuffer(br, pVal->pz, pVal->nLen); } else { - pVal->i64Key = tbufReadInt64(pBuf); + pVal->i64Key = tbufReadInt64(br); } } else if (pExpr->nodeType == TSQL_NODE_COL) { - SSchema* pSchema = calloc(1, sizeof(SSchema)); - if (pSchema == NULL) { - // TODO: - } + SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); pExpr->pSchema = pSchema; - pSchema->colId = tbufReadInt16(pBuf); - pSchema->bytes = tbufReadInt16(pBuf); - pSchema->type = tbufReadUint8(pBuf); - tbufReadToString(pBuf, pSchema->name, TSDB_COL_NAME_LEN); + pSchema->colId = tbufReadInt16(br); + pSchema->bytes = tbufReadInt16(br); + pSchema->type = tbufReadUint8(br); + tbufReadToString(br, pSchema->name, TSDB_COL_NAME_LEN); } else if (pExpr->nodeType == TSQL_NODE_EXPR) { - pExpr->_node.optr = tbufReadUint8(pBuf); - pExpr->_node.hasPK = tbufReadUint8(pBuf); - pExpr->_node.pLeft = exprTreeFromBinaryImpl(pBuf); - pExpr->_node.pRight = exprTreeFromBinaryImpl(pBuf); + pExpr->_node.optr = tbufReadUint8(br); + pExpr->_node.hasPK = tbufReadUint8(br); + pExpr->_node.pLeft = exprTreeFromBinaryImpl(br); + pExpr->_node.pRight = exprTreeFromBinaryImpl(br); assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL); } + CLEANUP_EXECUTE_TO(anchor, false); return pExpr; } -tExprNode* exprTreeFromBinary(const void* pBuf, size_t size) { +tExprNode* exprTreeFromBinary(const void* data, size_t size) { if (size == 0) { return NULL; } - SBuffer rbuf = {0}; - tbufBeginRead(&rbuf, pBuf, size); - return exprTreeFromBinaryImpl(&rbuf); + SBufferReader br = tbufInitReader(data, size, false); + return exprTreeFromBinaryImpl(&br); } tExprNode* exprTreeFromTableName(const char* tbnameCond) { @@ -1128,23 +1140,18 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { return NULL; } - tExprNode* expr = calloc(1, sizeof(tExprNode)); - if (expr == NULL) { - // TODO: - } + int32_t anchor = CLEANUP_GET_ANCHOR(); + + tExprNode* expr = exception_calloc(1, sizeof(tExprNode)); + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL); + expr->nodeType = TSQL_NODE_EXPR; - tExprNode* left = calloc(1, sizeof(tExprNode)); - if (left == NULL) { - // TODO: - } + tExprNode* left = exception_calloc(1, sizeof(tExprNode)); expr->_node.pLeft = left; left->nodeType = TSQL_NODE_COL; - SSchema* pSchema = calloc(1, sizeof(SSchema)); - if (pSchema == NULL) { - // TODO: - } + SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); left->pSchema = pSchema; pSchema->type = TSDB_DATA_TYPE_BINARY; @@ -1152,36 +1159,24 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { strcpy(pSchema->name, TSQL_TBNAME_L); pSchema->colId = -1; - tExprNode* right = calloc(1, sizeof(tExprNode)); - if (right == NULL) { - // TODO - } + tExprNode* right = exception_calloc(1, sizeof(tExprNode)); expr->_node.pRight = right; if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) { right->nodeType = TSQL_NODE_VALUE; expr->_node.optr = TSDB_RELATION_LIKE; - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); right->pVal = pVal; - pVal->nType = TSDB_DATA_TYPE_BINARY; size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1; - pVal->pz = malloc(len); - if (pVal->pz == NULL) { - // TODO: - } + pVal->pz = exception_malloc(len); memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len); + pVal->nType = TSDB_DATA_TYPE_BINARY; pVal->nLen = (int32_t)len; } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) { right->nodeType = TSQL_NODE_VALUE; expr->_node.optr = TSDB_RELATION_IN; - tVariant* pVal = calloc(1, sizeof(tVariant)); - if (pVal == NULL) { - // TODO: - } + tVariant* pVal = exception_calloc(1, sizeof(tVariant)); right->pVal = pVal; pVal->nType = TSDB_DATA_TYPE_ARRAY; pVal->arr = taosArrayInit(2, sizeof(char*)); @@ -1192,7 +1187,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { cond = e + 1; } else if (*e == ',') { size_t len = e - cond + 1; - char* p = malloc( len ); + char* p = exception_malloc( len ); memcpy(p, cond, len); p[len - 1] = 0; cond += len; @@ -1201,12 +1196,13 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { } if (*cond != 0) { - char* p = strdup( cond ); + char* p = exception_strdup( cond ); taosArrayPush(pVal->arr, &p); } taosArraySortString(pVal->arr); } + CLEANUP_EXECUTE_TO(anchor, false); return expr; } \ No newline at end of file diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp index 6a78cfbe53..dee85ef630 100644 --- a/src/query/tests/astTest.cpp +++ b/src/query/tests/astTest.cpp @@ -550,11 +550,12 @@ tExprNode* createExpr2() { void exprSerializeTest1() { tExprNode* p1 = createExpr1(); - SBuffer buf = exprTreeToBinary(p1); + SBufferWriter bw = tbufInitWriter(NULL, false); + exprTreeToBinary(&bw, p1); - size_t size = tbufTell(&buf); + size_t size = tbufTell(&bw); ASSERT_TRUE(size > 0); - char* b = tbufGetData(&buf, false); + char* b = tbufGetData(&bw, false); tExprNode* p2 = exprTreeFromBinary(b, size); ASSERT_EQ(p1->nodeType, p2->nodeType); @@ -581,16 +582,17 @@ void exprSerializeTest1() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&buf, false); + tbufClose(&bw); } void exprSerializeTest2() { tExprNode* p1 = createExpr2(); - SBuffer buf = exprTreeToBinary(p1); + SBufferWriter bw = tbufInitWriter(NULL, false); + exprTreeToBinary(&bw, p1); - size_t size = tbufTell(&buf); + size_t size = tbufTell(&bw); ASSERT_TRUE(size > 0); - char* b = tbufGetData(&buf, false); + char* b = tbufGetData(&bw, false); tExprNode* p2 = exprTreeFromBinary(b, size); ASSERT_EQ(p1->nodeType, p2->nodeType); @@ -625,7 +627,7 @@ void exprSerializeTest2() { tExprTreeDestroy(&p1, nullptr); tExprTreeDestroy(&p2, nullptr); - tbufClose(&buf, false); + tbufClose(&bw); } } // namespace TEST(testCase, astTest) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index eb35be5383..bc9220dbc7 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -18,6 +18,7 @@ #include "talgo.h" #include "tutil.h" #include "tcompare.h" +#include "exception.h" #include "../../../query/inc/qast.h" // todo move to common module #include "../../../query/inc/tlosertree.h" // todo move to util module @@ -1473,21 +1474,35 @@ int32_t tsdbQueryByTagsCond( } int32_t ret = TSDB_CODE_SUCCESS; + tExprNode* expr = NULL; - tExprNode* expr = exprTreeFromTableName(tbnameCond); - tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); - if (tagExpr != NULL) { + TRY(32) { + expr = exprTreeFromTableName(tbnameCond); if (expr == NULL) { - expr = tagExpr; + expr = exprTreeFromBinary(pTagCond, len); } else { - tExprNode* tbnameExpr = expr; - expr = calloc(1, sizeof(tExprNode)); - expr->nodeType = TSQL_NODE_EXPR; - expr->_node.optr = tagNameRelType; - expr->_node.pLeft = tagExpr; - expr->_node.pRight = tbnameExpr; + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL); + tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); + if (tagExpr != NULL) { + CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL); + tExprNode* tbnameExpr = expr; + expr = calloc(1, sizeof(tExprNode)); + if (expr == NULL) { + THROW( TSDB_CODE_SERV_OUT_OF_MEMORY ); + } + expr->nodeType = TSQL_NODE_EXPR; + expr->_node.optr = tagNameRelType; + expr->_node.pLeft = tagExpr; + expr->_node.pRight = tbnameExpr; + } } - } + CLEANUP_EXECUTE(); + + } CATCH( code ) { + CLEANUP_EXECUTE(); + ret = code; + // TODO: more error handling + } END_TRY doQueryTableList(pSTable, res, expr); pGroupInfo->numOfTables = taosArrayGetSize(res); diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 8f3f7f777e..e2bdb815d7 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -66,8 +66,6 @@ int main( int argc, char** argv ) { printf( "you should not see this message.\n" ); } CATCH( code ) { printf( "exception code is: %d, you will see this message after print out 5 integers and a string.\n", code ); - // throw it again and the exception will be caught in main - THROW( code ); } END_TRY tbufCloseWriter( &bw ); @@ -92,6 +90,7 @@ typedef struct { //////////////////////////////////////////////////////////////////////////////// // common functions & macros for both reader & writer + #define tbufTell( buf ) ((buf)->pos)