From 2013ba0d8f9b97b4f304925cc4bed0c8276db309 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 17:54:51 +0800 Subject: [PATCH 01/18] fix: fix asan problem --- source/dnode/mgmt/node_util/inc/dmUtil.h | 12 ++++----- source/dnode/mnode/impl/inc/mndInt.h | 14 +++++----- source/dnode/vnode/src/meta/metaTable.c | 4 +-- source/libs/index/src/indexFstFile.c | 18 +++++++++---- source/libs/transport/src/transCli.c | 34 +++++++++++++++--------- 5 files changed, 50 insertions(+), 32 deletions(-) diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index c2f403dfbb..6fec5b4551 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -53,12 +53,12 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);} -#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);} -#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);} +#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 785ecc2bf5..880c1c063d 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -41,12 +41,12 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);} -#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);} -#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} +#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -80,7 +80,7 @@ typedef struct { typedef struct { TdThreadMutex lock; - char email[TSDB_FQDN_LEN]; + char email[TSDB_FQDN_LEN]; } STelemMgmt; typedef struct { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 94aa464354..3325f4055c 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -1921,10 +1921,10 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_ // refactor if (IS_VAR_DATA_TYPE(type)) { memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE); - memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData); + if (pTagData != NULL) memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData); *(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid; } else { - memcpy((*ppTagIdxKey)->data, pTagData, nTagData); + if (pTagData != NULL) memcpy((*ppTagIdxKey)->data, pTagData, nTagData); *(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid; } diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 5a9c8dfe3d..40c50ed9cb 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -18,6 +18,7 @@ #include "indexInt.h" #include "indexUtil.h" #include "os.h" +#include "osDef.h" #include "tutil.h" static int32_t kBlockSize = 4096; @@ -172,7 +173,8 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); ctx->file.wBufOffset = 0; } - taosFsyncFile(ctx->file.pFile); + int ret = taosFsyncFile(ctx->file.pFile); + UNUSED(ret); } else { // do nothing } @@ -180,11 +182,11 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { } IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) { + int code = 0; IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx)); if (ctx == NULL) { return NULL; } - ctx->type = type; if (ctx->type == TFILE) { // ugly code, refactor later @@ -192,15 +194,21 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int memcpy(ctx->file.buf, path, strlen(path)); if (readOnly == false) { ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - taosFtruncateFile(ctx->file.pFile, 0); - taosStatFile(path, &ctx->file.size, NULL); + + code = taosFtruncateFile(ctx->file.pFile, 0); + UNUSED(code); + + code = taosStatFile(path, &ctx->file.size, NULL); + UNUSED(code); ctx->file.wBufOffset = 0; ctx->file.wBufCap = kBlockSize * 4; ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap); } else { ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); - taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + code = taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + UNUSED(code); + ctx->file.wBufOffset = 0; #ifdef USE_MMAP diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e1aeafaad..90daf296de 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -12,7 +12,9 @@ * along with this program. If not, see . */ +// #include "osMemory.h" #include "transComm.h" +#include "tutil.h" typedef struct SConnList { queue conns; @@ -224,9 +226,14 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } while (0); // snprintf may cause performance problem -#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ - do { \ - snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ +#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ + do { \ + char* t = key; \ + int16_t len = strlen(ip); \ + memcpy(t, ip, len); \ + t += len; \ + t[len] = ':'; \ + titoa(port, 10, &t[len + 1]); \ } while (0) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) @@ -330,12 +337,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - if (pCliMsg == NULL) - return false; - else { - cliSend(conn); - return true; - } + cliSend(conn); + return true; } return false; _RETURN: @@ -359,6 +362,7 @@ void cliHandleResp(SCliConn* conn) { int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); if (msgLen <= 0) { + taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } @@ -1705,17 +1709,23 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = createThrdObj(shandle); if (pThrd == NULL) { - return NULL; + goto _err; } int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); - if (err == 0) { + if (err != 0) { + goto _err; + } else { tDebug("success to create tranport-cli thread:%d", i); } cli->pThreadObj[i] = pThrd; } - return cli; + +_err: + taosMemoryFree(cli->pThreadObj); + taosMemoryFree(cli); + return NULL; } static FORCE_INLINE void destroyUserdata(STransMsg* userdata) { From 2e25f7e90de8c99b2ab6457176729c4531d1461b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 18:07:48 +0800 Subject: [PATCH 02/18] fix: fix asan problem --- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 6fec5b4551..55ee6d6973 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -58,7 +58,7 @@ extern "C" { #define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}} #define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}} #define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}} -#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on From de4a0047c5e43fc24b593edf229d83806b64e4b9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 23 Feb 2023 18:00:38 +0800 Subject: [PATCH 03/18] fix coverity issue CID:399957 --- include/libs/nodes/querynodes.h | 8 ++++---- include/util/tjson.h | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/nodes/src/nodesMsgFuncs.c | 4 ++-- source/util/src/tjson.c | 9 ++++++++- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 1a9700907e..20b8bf5c46 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -38,10 +38,10 @@ typedef struct SRawExprNode { } SRawExprNode; typedef struct SDataType { - uint8_t type; - uint8_t precision; - uint8_t scale; - int32_t bytes; + uint16_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; } SDataType; typedef struct SExprNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 6922930c13..24bdc31bad 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -76,6 +76,7 @@ int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pV int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal); +int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal); int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 099cd0d3b3..2051da1181 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2972,7 +2972,7 @@ static int32_t dataTypeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToDataType(const SJson* pJson, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tjsonGetUTinyIntValue(pJson, jkDataTypeType, &pNode->type); + int32_t code = tjsonGetUSmallIntValue(pJson, jkDataTypeType, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUTinyIntValue(pJson, jkDataTypePrecision, &pNode->precision); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index ad80508c64..fadce124af 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -619,7 +619,7 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t msgToDataTypeInline(STlvDecoder* pDecoder, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tlvDecodeValueI8(pDecoder, &pNode->type); + int32_t code = tlvDecodeValueU16(pDecoder, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueU8(pDecoder, &pNode->precision); } @@ -641,7 +641,7 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { case DATA_TYPE_CODE_TYPE: - code = tlvDecodeI8(pTlv, &pNode->type); + code = tlvDecodeU16(pTlv, &pNode->type); break; case DATA_TYPE_CODE_PRECISION: code = tlvDecodeU8(pTlv, &pNode->precision); diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 27d14d05b1..6741c3038f 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -250,6 +250,13 @@ int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) return code; } +int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal) { + uint64_t val = 0; + int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { uint64_t val = 0; int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); @@ -375,4 +382,4 @@ bool tjsonValidateJson(const char* jIn) { return true; } -const char* tjsonGetError() { return cJSON_GetErrorPtr(); } \ No newline at end of file +const char* tjsonGetError() { return cJSON_GetErrorPtr(); } From f294d60681ae186f1511ba1013f47ad9c42c05c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 20:40:29 +0800 Subject: [PATCH 04/18] fix: fix asan problem --- include/libs/transport/thttp.h | 4 +- source/libs/transport/src/thttp.c | 71 ++++++++++++++++------------ source/libs/transport/src/transCli.c | 11 ++--- source/libs/transport/src/transSvr.c | 2 + 4 files changed, 50 insertions(+), 38 deletions(-) diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index 9a6aee4187..f6f1f7f027 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -17,6 +17,7 @@ #define _TD_UTIL_HTTP_H_ #include "os.h" +#include "tref.h" #ifdef __cplusplus extern "C" { @@ -24,7 +25,8 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 8e5f79137f..9ad50c1466 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -26,6 +26,8 @@ #define HTTP_RECV_BUF_SIZE 1024 +static int32_t httpRefMgt = 0; +static int64_t httpRef = -1; typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; @@ -41,7 +43,6 @@ typedef struct SHttpMsg { int32_t len; EHttpCompFlag flag; int8_t quit; - SHttpModule* http; } SHttpMsg; @@ -57,7 +58,6 @@ typedef struct SHttpClient { } SHttpClient; static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; -static SHttpModule* thttp = NULL; static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); @@ -280,26 +280,28 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } int32_t httpSendQuit() { + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) return 0; + SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); msg->quit = 1; - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { - httpDestroyMsg(msg); - tError("http-report already released"); - return -1; - } else { - msg->http = load; - } - transAsyncSend(load->asyncPool, &(msg->q)); + transAsyncSend(http->asyncPool, &(msg->q)); + taosReleaseRef(httpRefMgt, httpRef); return 0; } static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + if (load == NULL) { + tError("http-report already released"); + return -1; + } + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = strdup(server); - msg->uri = strdup(uri); + msg->uri = strdup(uri); msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); @@ -307,15 +309,9 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 msg->flag = flag; msg->quit = 0; - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { - httpDestroyMsg(msg); - tError("http-report already released"); - return -1; - } - - msg->http = load; - return transAsyncSend(load->asyncPool, &(msg->q)); + int ret = transAsyncSend(load->asyncPool, &(msg->q)); + taosReleaseRef(httpRefMgt, httpRef); + return ret; } static void httpDestroyClientCb(uv_handle_t* handle) { @@ -335,13 +331,19 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { - SHttpModule* http = msg->http; taosMemoryFree(msg); + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) return; + uv_walk(http->loop, httpWalkCb, NULL); + taosReleaseRef(httpRefMgt, httpRef); } static void httpHandleReq(SHttpMsg* msg) { - SHttpModule* http = msg->http; + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) { + goto END; + } struct sockaddr_in dest = {0}; if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { @@ -391,6 +393,7 @@ static void httpHandleReq(SHttpMsg* msg) { int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + taosReleaseRef(httpRefMgt, httpRef); destroyHttpClient(cli); return; } @@ -401,21 +404,26 @@ static void httpHandleReq(SHttpMsg* msg) { cli->port); destroyHttpClient(cli); } + taosReleaseRef(httpRefMgt, httpRef); return; END: tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, httpRef); } -int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); } +static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } static void transHttpEnvInit() { - SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); + SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(http->loop); @@ -426,21 +434,22 @@ static void transHttpEnvInit() { http = NULL; return; } - + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { taosMemoryFree(http->loop); taosMemoryFree(http); http = NULL; } - atomic_store_ptr(&thttp, http); + httpRef = taosAddRef(httpRefMgt, http); } void transHttpEnvDestroy() { - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { + // remove http + if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) { return; } + SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); httpSendQuit(); taosThreadJoin(load->thread, NULL); @@ -448,7 +457,7 @@ void transHttpEnvDestroy() { transAsyncPoolDestroy(load->asyncPool); uv_loop_close(load->loop); taosMemoryFree(load->loop); - taosMemoryFree(load); - atomic_store_ptr(&thttp, NULL); + taosReleaseRef(httpRefMgt, httpRef); + taosRemoveRef(httpRefMgt, httpRef); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 90daf296de..19ee4fe690 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -67,15 +67,13 @@ typedef struct SCliConn { SCliBatch* pBatch; - int64_t refId; - char* ip; - SDelayTask* task; - // debug and log info - char src[32]; - char dst[32]; + char* ip; + char src[32]; + char dst[32]; + int64_t refId; } SCliConn; typedef struct SCliMsg { @@ -134,6 +132,7 @@ typedef struct { int32_t threshold; int64_t interval; } SFailFastItem; + // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 04e094ae9a..822969132b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -14,6 +14,8 @@ #include "transComm.h" +static int32_t httpRefMgt = 0; + static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; From d34c70d172ea4e8efd58d3f52eb1769d157adaec Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 24 Feb 2023 09:57:57 +0800 Subject: [PATCH 05/18] Revert "fix coverity issue CID:399957" This reverts commit de4a0047c5e43fc24b593edf229d83806b64e4b9. --- include/libs/nodes/querynodes.h | 8 ++++---- include/util/tjson.h | 1 - source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/nodes/src/nodesMsgFuncs.c | 4 ++-- source/util/src/tjson.c | 9 +-------- 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 20b8bf5c46..1a9700907e 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -38,10 +38,10 @@ typedef struct SRawExprNode { } SRawExprNode; typedef struct SDataType { - uint16_t type; - uint8_t precision; - uint8_t scale; - int32_t bytes; + uint8_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; } SDataType; typedef struct SExprNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 24bdc31bad..6922930c13 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -76,7 +76,6 @@ int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pV int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal); -int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal); int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 2051da1181..099cd0d3b3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2972,7 +2972,7 @@ static int32_t dataTypeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToDataType(const SJson* pJson, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tjsonGetUSmallIntValue(pJson, jkDataTypeType, &pNode->type); + int32_t code = tjsonGetUTinyIntValue(pJson, jkDataTypeType, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUTinyIntValue(pJson, jkDataTypePrecision, &pNode->precision); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index fadce124af..ad80508c64 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -619,7 +619,7 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t msgToDataTypeInline(STlvDecoder* pDecoder, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tlvDecodeValueU16(pDecoder, &pNode->type); + int32_t code = tlvDecodeValueI8(pDecoder, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueU8(pDecoder, &pNode->precision); } @@ -641,7 +641,7 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { case DATA_TYPE_CODE_TYPE: - code = tlvDecodeU16(pTlv, &pNode->type); + code = tlvDecodeI8(pTlv, &pNode->type); break; case DATA_TYPE_CODE_PRECISION: code = tlvDecodeU8(pTlv, &pNode->precision); diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 6741c3038f..27d14d05b1 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -250,13 +250,6 @@ int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) return code; } -int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal) { - uint64_t val = 0; - int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); - *pVal = val; - return code; -} - int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { uint64_t val = 0; int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); @@ -382,4 +375,4 @@ bool tjsonValidateJson(const char* jIn) { return true; } -const char* tjsonGetError() { return cJSON_GetErrorPtr(); } +const char* tjsonGetError() { return cJSON_GetErrorPtr(); } \ No newline at end of file From 6ff76e57aee9d3d9af6f8a5b1e4e38874b8ae046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 09:59:32 +0800 Subject: [PATCH 06/18] fix: opt trans debug info --- source/libs/transport/src/transSvr.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 822969132b..04e094ae9a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -14,8 +14,6 @@ #include "transComm.h" -static int32_t httpRefMgt = 0; - static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; From 49fc02c52e95c8fdcc4d2271f21c6b7b21e58895 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 24 Feb 2023 10:08:53 +0800 Subject: [PATCH 07/18] fix coverity issue --- include/common/ttypes.h | 4 ---- source/common/src/tvariant.c | 33 --------------------------------- source/libs/scalar/src/filter.c | 18 +----------------- 3 files changed, 1 insertion(+), 54 deletions(-) diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 97ae151b7a..f8a85ee1b0 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -53,10 +53,6 @@ typedef struct { #define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0])) #define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v)) -// this data type is internally used only in 'in' query to hold the values -#define TSDB_DATA_TYPE_POINTER_ARRAY (1000) -#define TSDB_DATA_TYPE_VALUE_ARRAY (1001) - #define GET_TYPED_DATA(_v, _finalType, _type, _data) \ do { \ switch (_type) { \ diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index de225581a6..db69fe9d48 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -145,19 +145,6 @@ void taosVariantDestroy(SVariant *pVar) { pVar->nLen = 0; } - // NOTE: this is only for string array - if (pVar->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { - size_t num = taosArrayGetSize(pVar->arr); - for (size_t i = 0; i < num; i++) { - void *p = taosArrayGetP(pVar->arr, i); - taosMemoryFree(p); - } - taosArrayDestroy(pVar->arr); - pVar->arr = NULL; - } else if (pVar->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { - taosArrayDestroy(pVar->arr); - pVar->arr = NULL; - } } void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) { @@ -180,28 +167,8 @@ void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) { if (IS_NUMERIC_TYPE(pSrc->nType) || (pSrc->nType == TSDB_DATA_TYPE_BOOL)) { pDst->i = pSrc->i; - } else if (pSrc->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { // this is only for string array - size_t num = taosArrayGetSize(pSrc->arr); - pDst->arr = taosArrayInit(num, sizeof(char *)); - for (size_t i = 0; i < num; i++) { - char *p = (char *)taosArrayGetP(pSrc->arr, i); - char *n = strdup(p); - taosArrayPush(pDst->arr, &n); - } - } else if (pSrc->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { - size_t num = taosArrayGetSize(pSrc->arr); - pDst->arr = taosArrayInit(num, sizeof(int64_t)); - pDst->nLen = pSrc->nLen; - ASSERT(pSrc->nLen == num); - for (size_t i = 0; i < num; i++) { - int64_t *p = taosArrayGet(pSrc->arr, i); - taosArrayPush(pDst->arr, p); - } } - if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) { - pDst->nLen = tDataTypes[pDst->nType].bytes; - } } int32_t taosVariantCompare(const SVariant *p1, const SVariant *p2) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 25e65d2588..c0c8072d81 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1651,12 +1651,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) SValueNode *var = (SValueNode *)field->desc; SDataType *dType = &var->node.resType; - // if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { - // qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data, - // *(((int64_t *)field->data) + 1)); - // } else { qDebug("VAL%d => [type:%d][val:%" PRIx64 "]", i, dType->type, var->datum.i); // TODO - //} } else if (field->data) { qDebug("VAL%d => [type:NIL][val:NIL]", i); // TODO } @@ -1976,18 +1971,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) { fi->data = taosMemoryCalloc(1, bytes); } else { - if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { // TIME RANGE - /* - fi->data = taosMemoryCalloc(dType->bytes, tDataTypes[type].bytes); - for (int32_t a = 0; a < dType->bytes; ++a) { - int64_t *v = taosArrayGet(var->arr, a); - assignVal((char *)fi->data + a * tDataTypes[type].bytes, (char *)v, 0, type); - } - */ - continue; - } else { - fi->data = taosMemoryCalloc(1, sizeof(int64_t)); - } + fi->data = taosMemoryCalloc(1, sizeof(int64_t)); } if (dType->type == type) { From 5025f1a38a36aa9d37e8c92e6e533b7b4c411dff Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 10:13:28 +0800 Subject: [PATCH 08/18] fix: opt trans debug info --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 19ee4fe690..c4c6931f64 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -229,8 +229,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); do { \ char* t = key; \ int16_t len = strlen(ip); \ - memcpy(t, ip, len); \ - t += len; \ + if (ip ! = NULL) memcpy(t, ip, len); \ t[len] = ':'; \ titoa(port, 10, &t[len + 1]); \ } while (0) @@ -2312,6 +2311,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; + pCtx->origEpSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; From b5d4709374644874641ca6892a47ce33a69f53b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 11:07:07 +0800 Subject: [PATCH 09/18] fix: coverity scan proble --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c4c6931f64..8aebcbd576 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -229,7 +229,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); do { \ char* t = key; \ int16_t len = strlen(ip); \ - if (ip ! = NULL) memcpy(t, ip, len); \ + if (ip != NULL) memcpy(t, ip, len); \ t[len] = ':'; \ titoa(port, 10, &t[len + 1]); \ } while (0) From eac27fc0fd1454aa057b8e82a61c6a6f46b13f1e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Feb 2023 18:39:17 +0800 Subject: [PATCH 10/18] fix: empty ts range query issue --- source/libs/parser/src/parTranslater.c | 3 +++ tests/parallel_test/cases.task | 1 + tests/script/tsim/query/emptyTsRange.sim | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+) create mode 100644 tests/script/tsim/query/emptyTsRange.sim diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7b6f795ecf..7e9ca9554b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3342,6 +3342,9 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange); } + if (TSDB_CODE_SUCCESS == code && pSelect->timeRange.skey > pSelect->timeRange.ekey) { + pSelect->isEmptyResult = true; + } return code; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 63760d6ae4..867f16d71a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -181,6 +181,7 @@ ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/event.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim +,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/emptyTsRange.sim b/tests/script/tsim/query/emptyTsRange.sim new file mode 100644 index 0000000000..ca3daf2bbb --- /dev/null +++ b/tests/script/tsim/query/emptyTsRange.sim @@ -0,0 +1,20 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql drop database if exists db1; +sql create database if not exists db1; +sql use db1; +sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int); +sql create table tba1 using sta tags(1); +sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a"); +sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b"); +sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b"); +sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b"); +sql select last_row(*) from sta where ts >= 1678901803783 and ts <= 1678901803783 and _c0 <= 1678901803782 interval(10d,8d) fill(linear) order by _wstart desc; +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 66322311110981913ac09c2fcf341c9712b24120 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 23:42:57 +0800 Subject: [PATCH 11/18] fix: limit session num --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index f7da9cba25..04b546b36a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -302,7 +302,7 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = taosStrdup(server); - msg->uri = taosStrdup(uri); + msg->uri = taosStrdup(uri); msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); @@ -447,7 +447,7 @@ static void transHttpEnvInit() { void transHttpEnvDestroy() { // remove http - if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) { + if (httpRef == -1) { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); From f933f604e466f312b262dd072a15b40c86d5897c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Feb 2023 11:09:24 +0800 Subject: [PATCH 12/18] fix: undefine the strdup by default. --- include/os/osMemory.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 3f57c72933..c35fd782fb 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -29,9 +29,12 @@ extern "C" { #define calloc CALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup #define strdup STRDUP_FUNC_TAOS_FORBID -#endif // ifndef ALLOW_FORBID_FUNC +#endif +#endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) int32_t taosMemoryDbgInit(); From 3e9cc93b9f44b0f5dc9a85930574a785280756a5 Mon Sep 17 00:00:00 2001 From: huolibo Date: Sat, 25 Feb 2023 12:54:24 +0800 Subject: [PATCH 13/18] enh(driver): tmq async commit callback (#20114) --- .../jni/com_taosdata_jdbc_tmq_TMQConnector.h | 3 + source/client/src/clientJniConnector.c | 1 + source/client/src/clientTmqConnector.c | 66 +++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 197cd78006..c035b6598c 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -99,6 +99,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI */ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, + jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp diff --git a/source/client/src/clientJniConnector.c b/source/client/src/clientJniConnector.c index 750ba684f4..cfa6f84bd2 100644 --- a/source/client/src/clientJniConnector.c +++ b/source/client/src/clientJniConnector.c @@ -56,6 +56,7 @@ jmethodID g_createConsumerErrorCallback; jmethodID g_topicListCallback; jclass g_consumerClass; +// deprecated jmethodID g_commitCallback; void jniGetGlobalMethod(JNIEnv *env) { diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index ccfc4980bc..a8c9f2279d 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -17,6 +17,36 @@ #include "jniCommon.h" #include "taos.h" +int __init_tmq = 0; +jmethodID g_offsetCallback; + +void tmqGlobalMethod(JNIEnv *env) { + // make sure init function executed once + switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) { + case 0: + break; + case 1: + do { + taosMsleep(0); + } while (atomic_load_32(&__init_tmq) == 1); + case 2: + return; + } + + if (g_vm == NULL) { + (*env)->GetJavaVM(env, &g_vm); + } + + jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback"); + jclass g_offsetCallbackClass = (*env)->NewGlobalRef(env, offset); + g_offsetCallback = (*env)->GetMethodID(env, g_offsetCallbackClass, "commitCallbackHandler", "(I)V"); + (*env)->DeleteLocalRef(env, offset); + + atomic_store_32(&__init_tmq, 2); + jniDebug("tmq method register finished"); +} + +// deprecated void commit_cb(tmq_t *tmq, int32_t code, void *param) { JNIEnv *env = NULL; int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6); @@ -40,6 +70,28 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) { env = NULL; } +void consumer_callback(tmq_t *tmq, int32_t code, void *param) { + JNIEnv *env = NULL; + int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6); + bool needDetach = false; + if (status < 0) { + if ((*g_vm)->AttachCurrentThread(g_vm, (void **)&env, NULL) != 0) { + return; + } + needDetach = true; + } + + jobject obj = (jobject)param; + (*env)->CallVoidMethod(env, obj, g_offsetCallback, code); + (*env)->DeleteGlobalRef(env, obj); + param = NULL; + + if (needDetach) { + (*g_vm)->DetachCurrentThread(g_vm); + } + env = NULL; +} + JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConfNewImp(JNIEnv *env, jobject jobj) { tmq_conf_t *conf = tmq_conf_new(); jniGetGlobalMethod(env); @@ -201,6 +253,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI return tmq_commit_sync(tmq, res); } +// deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, jlong jres, jobject consumer) { tmq_t *tmq = (tmq_t *)jtmq; @@ -213,6 +266,19 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN tmq_commit_async(tmq, res, commit_cb, consumer); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, + jlong jres, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return; + } + TAOS_RES *res = (TAOS_RES *)jres; + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, res, consumer_callback, offset); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; From 0e613ea3f381d3a3c349873b02b5e0f9a1857ec2 Mon Sep 17 00:00:00 2001 From: huolibo Date: Sat, 25 Feb 2023 14:48:25 +0800 Subject: [PATCH 14/18] enh(driver): add spring + mybatis type:byte[] example (#20050) * enh(driver): add spring + mybatis type:byte[] example * doc: add init description * docs: add byte[] description --- examples/JDBC/springbootdemo/readme.md | 7 +++++ .../springbootdemo/dao/WeatherMapper.xml | 10 +++++-- .../springbootdemo/domain/Weather.java | 30 +++++++++++++++++++ .../service/WeatherService.java | 2 ++ 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/examples/JDBC/springbootdemo/readme.md b/examples/JDBC/springbootdemo/readme.md index a3942a6a51..a89e21c009 100644 --- a/examples/JDBC/springbootdemo/readme.md +++ b/examples/JDBC/springbootdemo/readme.md @@ -1,6 +1,13 @@ ## TDengine SpringBoot + Mybatis Demo ## 需要提前创建 test 数据库 + +``` +$ taos -s 'create database if not exists test' + +$ curl http://localhost:8080/weather/init +``` + ### 配置 application.properties ```properties # datasource config diff --git a/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml b/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml index 99d5893ec1..4899ec4654 100644 --- a/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml +++ b/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml @@ -7,6 +7,7 @@ + - insert into test.t#{groupId} (ts, temperature, humidity, note) - values (#{ts}, ${temperature}, ${humidity}, #{note}) + insert into test.t#{groupId} (ts, temperature, humidity, note, bytes) + values (#{ts}, ${temperature}, ${humidity}, #{note}, #{bytes})