fix: fix asan problem

This commit is contained in:
yihaoDeng 2023-02-23 20:40:29 +08:00
parent 2e25f7e90d
commit f294d60681
4 changed files with 50 additions and 38 deletions

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_HTTP_H_ #define _TD_UTIL_HTTP_H_
#include "os.h" #include "os.h"
#include "tref.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -24,7 +25,8 @@ extern "C" {
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -26,6 +26,8 @@
#define HTTP_RECV_BUF_SIZE 1024 #define HTTP_RECV_BUF_SIZE 1024
static int32_t httpRefMgt = 0;
static int64_t httpRef = -1;
typedef struct SHttpModule { typedef struct SHttpModule {
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
@ -41,7 +43,6 @@ typedef struct SHttpMsg {
int32_t len; int32_t len;
EHttpCompFlag flag; EHttpCompFlag flag;
int8_t quit; int8_t quit;
SHttpModule* http;
} SHttpMsg; } SHttpMsg;
@ -57,7 +58,6 @@ typedef struct SHttpClient {
} SHttpClient; } SHttpClient;
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static SHttpModule* thttp = NULL;
static void transHttpEnvInit(); static void transHttpEnvInit();
static void httpHandleReq(SHttpMsg* msg); static void httpHandleReq(SHttpMsg* msg);
@ -280,26 +280,28 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
} }
int32_t httpSendQuit() { int32_t httpSendQuit() {
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) return 0;
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
msg->quit = 1; msg->quit = 1;
SHttpModule* load = atomic_load_ptr(&thttp); transAsyncSend(http->asyncPool, &(msg->q));
if (load == NULL) { taosReleaseRef(httpRefMgt, httpRef);
httpDestroyMsg(msg);
tError("http-report already released");
return -1;
} else {
msg->http = load;
}
transAsyncSend(load->asyncPool, &(msg->q));
return 0; return 0;
} }
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
if (load == NULL) {
tError("http-report already released");
return -1;
}
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
msg->server = strdup(server); msg->server = strdup(server);
msg->uri = strdup(uri); msg->uri = strdup(uri);
msg->port = port; msg->port = port;
msg->cont = taosMemoryMalloc(contLen); msg->cont = taosMemoryMalloc(contLen);
memcpy(msg->cont, pCont, contLen); memcpy(msg->cont, pCont, contLen);
@ -307,15 +309,9 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1
msg->flag = flag; msg->flag = flag;
msg->quit = 0; msg->quit = 0;
SHttpModule* load = atomic_load_ptr(&thttp); int ret = transAsyncSend(load->asyncPool, &(msg->q));
if (load == NULL) { taosReleaseRef(httpRefMgt, httpRef);
httpDestroyMsg(msg); return ret;
tError("http-report already released");
return -1;
}
msg->http = load;
return transAsyncSend(load->asyncPool, &(msg->q));
} }
static void httpDestroyClientCb(uv_handle_t* handle) { static void httpDestroyClientCb(uv_handle_t* handle) {
@ -335,13 +331,19 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
return; return;
} }
static void httpHandleQuit(SHttpMsg* msg) { static void httpHandleQuit(SHttpMsg* msg) {
SHttpModule* http = msg->http;
taosMemoryFree(msg); taosMemoryFree(msg);
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) return;
uv_walk(http->loop, httpWalkCb, NULL); uv_walk(http->loop, httpWalkCb, NULL);
taosReleaseRef(httpRefMgt, httpRef);
} }
static void httpHandleReq(SHttpMsg* msg) { static void httpHandleReq(SHttpMsg* msg) {
SHttpModule* http = msg->http; SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) {
goto END;
}
struct sockaddr_in dest = {0}; struct sockaddr_in dest = {0};
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
@ -391,6 +393,7 @@ static void httpHandleReq(SHttpMsg* msg) {
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) { if (ret != 0) {
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
taosReleaseRef(httpRefMgt, httpRef);
destroyHttpClient(cli); destroyHttpClient(cli);
return; return;
} }
@ -401,21 +404,26 @@ static void httpHandleReq(SHttpMsg* msg) {
cli->port); cli->port);
destroyHttpClient(cli); destroyHttpClient(cli);
} }
taosReleaseRef(httpRefMgt, httpRef);
return; return;
END: END:
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
httpDestroyMsg(msg); httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, httpRef);
} }
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
taosThreadOnce(&transHttpInit, transHttpEnvInit); taosThreadOnce(&transHttpInit, transHttpEnvInit);
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
} }
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
static void transHttpEnvInit() { static void transHttpEnvInit() {
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(http->loop); uv_loop_init(http->loop);
@ -426,21 +434,22 @@ static void transHttpEnvInit() {
http = NULL; http = NULL;
return; return;
} }
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) { if (err != 0) {
taosMemoryFree(http->loop); taosMemoryFree(http->loop);
taosMemoryFree(http); taosMemoryFree(http);
http = NULL; http = NULL;
} }
atomic_store_ptr(&thttp, http); httpRef = taosAddRef(httpRefMgt, http);
} }
void transHttpEnvDestroy() { void transHttpEnvDestroy() {
SHttpModule* load = atomic_load_ptr(&thttp); // remove http
if (load == NULL) { if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) {
return; return;
} }
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
httpSendQuit(); httpSendQuit();
taosThreadJoin(load->thread, NULL); taosThreadJoin(load->thread, NULL);
@ -448,7 +457,7 @@ void transHttpEnvDestroy() {
transAsyncPoolDestroy(load->asyncPool); transAsyncPoolDestroy(load->asyncPool);
uv_loop_close(load->loop); uv_loop_close(load->loop);
taosMemoryFree(load->loop); taosMemoryFree(load->loop);
taosMemoryFree(load);
atomic_store_ptr(&thttp, NULL); taosReleaseRef(httpRefMgt, httpRef);
taosRemoveRef(httpRefMgt, httpRef);
} }

View File

@ -67,15 +67,13 @@ typedef struct SCliConn {
SCliBatch* pBatch; SCliBatch* pBatch;
int64_t refId;
char* ip;
SDelayTask* task; SDelayTask* task;
// debug and log info char* ip;
char src[32]; char src[32];
char dst[32]; char dst[32];
int64_t refId;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -134,6 +132,7 @@ typedef struct {
int32_t threshold; int32_t threshold;
int64_t interval; int64_t interval;
} SFailFastItem; } SFailFastItem;
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);

View File

@ -14,6 +14,8 @@
#include "transComm.h" #include "transComm.h"
static int32_t httpRefMgt = 0;
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a"; static char* notify = "a";