Merge pull request #20131 from taosdata/fix/coverity_yihao
fix: fix asan problem
This commit is contained in:
commit
90c6641cff
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_UTIL_HTTP_H_
|
#define _TD_UTIL_HTTP_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tref.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -24,7 +25,8 @@ extern "C" {
|
||||||
|
|
||||||
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
|
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
|
||||||
|
|
||||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag);
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
|
EHttpCompFlag flag);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,12 +53,12 @@ extern "C" {
|
||||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -41,12 +41,12 @@ extern "C" {
|
||||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
@ -80,7 +80,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThreadMutex lock;
|
TdThreadMutex lock;
|
||||||
char email[TSDB_FQDN_LEN];
|
char email[TSDB_FQDN_LEN];
|
||||||
} STelemMgmt;
|
} STelemMgmt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1921,10 +1921,10 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
|
||||||
// refactor
|
// refactor
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE);
|
memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE);
|
||||||
memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
|
if (pTagData != NULL) memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
|
||||||
*(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid;
|
*(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid;
|
||||||
} else {
|
} else {
|
||||||
memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
if (pTagData != NULL) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
||||||
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
#include "indexUtil.h"
|
#include "indexUtil.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "osDef.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
static int32_t kBlockSize = 4096;
|
static int32_t kBlockSize = 4096;
|
||||||
|
@ -172,7 +173,8 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
|
||||||
int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
|
int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
|
||||||
ctx->file.wBufOffset = 0;
|
ctx->file.wBufOffset = 0;
|
||||||
}
|
}
|
||||||
taosFsyncFile(ctx->file.pFile);
|
int ret = taosFsyncFile(ctx->file.pFile);
|
||||||
|
UNUSED(ret);
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
@ -180,11 +182,11 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
||||||
|
int code = 0;
|
||||||
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
|
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
|
||||||
if (ctx == NULL) {
|
if (ctx == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->type = type;
|
ctx->type = type;
|
||||||
if (ctx->type == TFILE) {
|
if (ctx->type == TFILE) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
|
@ -192,15 +194,21 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
memcpy(ctx->file.buf, path, strlen(path));
|
memcpy(ctx->file.buf, path, strlen(path));
|
||||||
if (readOnly == false) {
|
if (readOnly == false) {
|
||||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
taosFtruncateFile(ctx->file.pFile, 0);
|
|
||||||
taosStatFile(path, &ctx->file.size, NULL);
|
code = taosFtruncateFile(ctx->file.pFile, 0);
|
||||||
|
UNUSED(code);
|
||||||
|
|
||||||
|
code = taosStatFile(path, &ctx->file.size, NULL);
|
||||||
|
UNUSED(code);
|
||||||
|
|
||||||
ctx->file.wBufOffset = 0;
|
ctx->file.wBufOffset = 0;
|
||||||
ctx->file.wBufCap = kBlockSize * 4;
|
ctx->file.wBufCap = kBlockSize * 4;
|
||||||
ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap);
|
ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap);
|
||||||
} else {
|
} else {
|
||||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||||
taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
code = taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
||||||
|
UNUSED(code);
|
||||||
|
|
||||||
ctx->file.wBufOffset = 0;
|
ctx->file.wBufOffset = 0;
|
||||||
|
|
||||||
#ifdef USE_MMAP
|
#ifdef USE_MMAP
|
||||||
|
|
|
@ -26,6 +26,8 @@
|
||||||
|
|
||||||
#define HTTP_RECV_BUF_SIZE 1024
|
#define HTTP_RECV_BUF_SIZE 1024
|
||||||
|
|
||||||
|
static int32_t httpRefMgt = 0;
|
||||||
|
static int64_t httpRef = -1;
|
||||||
typedef struct SHttpModule {
|
typedef struct SHttpModule {
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
|
@ -41,7 +43,6 @@ typedef struct SHttpMsg {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
EHttpCompFlag flag;
|
EHttpCompFlag flag;
|
||||||
int8_t quit;
|
int8_t quit;
|
||||||
SHttpModule* http;
|
|
||||||
|
|
||||||
} SHttpMsg;
|
} SHttpMsg;
|
||||||
|
|
||||||
|
@ -57,7 +58,6 @@ typedef struct SHttpClient {
|
||||||
} SHttpClient;
|
} SHttpClient;
|
||||||
|
|
||||||
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
||||||
static SHttpModule* thttp = NULL;
|
|
||||||
static void transHttpEnvInit();
|
static void transHttpEnvInit();
|
||||||
|
|
||||||
static void httpHandleReq(SHttpMsg* msg);
|
static void httpHandleReq(SHttpMsg* msg);
|
||||||
|
@ -280,26 +280,29 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t httpSendQuit() {
|
int32_t httpSendQuit() {
|
||||||
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
|
if (http == NULL) return 0;
|
||||||
|
|
||||||
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
||||||
msg->quit = 1;
|
msg->quit = 1;
|
||||||
|
|
||||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
transAsyncSend(http->asyncPool, &(msg->q));
|
||||||
if (load == NULL) {
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
httpDestroyMsg(msg);
|
|
||||||
tError("http-report already released");
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
msg->http = load;
|
|
||||||
}
|
|
||||||
transAsyncSend(load->asyncPool, &(msg->q));
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
|
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
|
if (load == NULL) {
|
||||||
|
tError("http-report already released");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
||||||
|
|
||||||
msg->server = taosStrdup(server);
|
msg->server = taosStrdup(server);
|
||||||
msg->uri = taosStrdup(uri);
|
msg->uri = taosStrdup(uri);
|
||||||
msg->port = port;
|
msg->port = port;
|
||||||
msg->cont = taosMemoryMalloc(contLen);
|
msg->cont = taosMemoryMalloc(contLen);
|
||||||
memcpy(msg->cont, pCont, contLen);
|
memcpy(msg->cont, pCont, contLen);
|
||||||
|
@ -307,15 +310,9 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1
|
||||||
msg->flag = flag;
|
msg->flag = flag;
|
||||||
msg->quit = 0;
|
msg->quit = 0;
|
||||||
|
|
||||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
int ret = transAsyncSend(load->asyncPool, &(msg->q));
|
||||||
if (load == NULL) {
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
httpDestroyMsg(msg);
|
return ret;
|
||||||
tError("http-report already released");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
msg->http = load;
|
|
||||||
return transAsyncSend(load->asyncPool, &(msg->q));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void httpDestroyClientCb(uv_handle_t* handle) {
|
static void httpDestroyClientCb(uv_handle_t* handle) {
|
||||||
|
@ -335,13 +332,19 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static void httpHandleQuit(SHttpMsg* msg) {
|
static void httpHandleQuit(SHttpMsg* msg) {
|
||||||
SHttpModule* http = msg->http;
|
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
|
||||||
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
|
if (http == NULL) return;
|
||||||
|
|
||||||
uv_walk(http->loop, httpWalkCb, NULL);
|
uv_walk(http->loop, httpWalkCb, NULL);
|
||||||
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
}
|
}
|
||||||
static void httpHandleReq(SHttpMsg* msg) {
|
static void httpHandleReq(SHttpMsg* msg) {
|
||||||
SHttpModule* http = msg->http;
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
|
if (http == NULL) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
struct sockaddr_in dest = {0};
|
struct sockaddr_in dest = {0};
|
||||||
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
|
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
|
||||||
|
@ -391,6 +394,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
||||||
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -401,21 +405,26 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
cli->port);
|
cli->port);
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
|
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
|
EHttpCompFlag flag) {
|
||||||
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
||||||
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
|
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
||||||
static void transHttpEnvInit() {
|
static void transHttpEnvInit() {
|
||||||
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
||||||
|
|
||||||
|
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||||
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(http->loop);
|
uv_loop_init(http->loop);
|
||||||
|
|
||||||
|
@ -426,21 +435,22 @@ static void transHttpEnvInit() {
|
||||||
http = NULL;
|
http = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
taosMemoryFree(http->loop);
|
taosMemoryFree(http->loop);
|
||||||
taosMemoryFree(http);
|
taosMemoryFree(http);
|
||||||
http = NULL;
|
http = NULL;
|
||||||
}
|
}
|
||||||
atomic_store_ptr(&thttp, http);
|
httpRef = taosAddRef(httpRefMgt, http);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transHttpEnvDestroy() {
|
void transHttpEnvDestroy() {
|
||||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
// remove http
|
||||||
if (load == NULL) {
|
if (httpRef == -1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
httpSendQuit();
|
httpSendQuit();
|
||||||
taosThreadJoin(load->thread, NULL);
|
taosThreadJoin(load->thread, NULL);
|
||||||
|
|
||||||
|
@ -448,7 +458,7 @@ void transHttpEnvDestroy() {
|
||||||
transAsyncPoolDestroy(load->asyncPool);
|
transAsyncPoolDestroy(load->asyncPool);
|
||||||
uv_loop_close(load->loop);
|
uv_loop_close(load->loop);
|
||||||
taosMemoryFree(load->loop);
|
taosMemoryFree(load->loop);
|
||||||
taosMemoryFree(load);
|
|
||||||
|
|
||||||
atomic_store_ptr(&thttp, NULL);
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
|
taosRemoveRef(httpRefMgt, httpRef);
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// #include "osMemory.h"
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
typedef struct SConnList {
|
typedef struct SConnList {
|
||||||
queue conns;
|
queue conns;
|
||||||
|
@ -65,15 +67,13 @@ typedef struct SCliConn {
|
||||||
|
|
||||||
SCliBatch* pBatch;
|
SCliBatch* pBatch;
|
||||||
|
|
||||||
int64_t refId;
|
|
||||||
char* ip;
|
|
||||||
|
|
||||||
SDelayTask* task;
|
SDelayTask* task;
|
||||||
|
|
||||||
// debug and log info
|
char* ip;
|
||||||
char src[32];
|
char src[32];
|
||||||
char dst[32];
|
char dst[32];
|
||||||
|
|
||||||
|
int64_t refId;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
|
@ -132,6 +132,7 @@ typedef struct {
|
||||||
int32_t threshold;
|
int32_t threshold;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
} SFailFastItem;
|
} SFailFastItem;
|
||||||
|
|
||||||
// conn pool
|
// conn pool
|
||||||
// add expire timeout and capacity limit
|
// add expire timeout and capacity limit
|
||||||
static void* createConnPool(int size);
|
static void* createConnPool(int size);
|
||||||
|
@ -224,9 +225,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// snprintf may cause performance problem
|
// snprintf may cause performance problem
|
||||||
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
||||||
do { \
|
do { \
|
||||||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
char* t = key; \
|
||||||
|
int16_t len = strlen(ip); \
|
||||||
|
if (ip != NULL) memcpy(t, ip, len); \
|
||||||
|
t[len] = ':'; \
|
||||||
|
titoa(port, 10, &t[len + 1]); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||||
|
@ -330,12 +335,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
if (!transQueueEmpty(&conn->cliMsgs)) {
|
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||||
SCliMsg* pCliMsg = NULL;
|
SCliMsg* pCliMsg = NULL;
|
||||||
CONN_GET_NEXT_SENDMSG(conn);
|
CONN_GET_NEXT_SENDMSG(conn);
|
||||||
if (pCliMsg == NULL)
|
cliSend(conn);
|
||||||
return false;
|
return true;
|
||||||
else {
|
|
||||||
cliSend(conn);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
_RETURN:
|
_RETURN:
|
||||||
|
@ -359,6 +360,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
|
taosMemoryFree(pHead);
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1727,17 +1729,23 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||||
SCliThrd* pThrd = createThrdObj(shandle);
|
SCliThrd* pThrd = createThrdObj(shandle);
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
return NULL;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
|
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
|
||||||
if (err == 0) {
|
if (err != 0) {
|
||||||
|
goto _err;
|
||||||
|
} else {
|
||||||
tDebug("success to create tranport-cli thread:%d", i);
|
tDebug("success to create tranport-cli thread:%d", i);
|
||||||
}
|
}
|
||||||
cli->pThreadObj[i] = pThrd;
|
cli->pThreadObj[i] = pThrd;
|
||||||
}
|
}
|
||||||
|
|
||||||
return cli;
|
return cli;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosMemoryFree(cli->pThreadObj);
|
||||||
|
taosMemoryFree(cli);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
||||||
|
|
Loading…
Reference in New Issue