|
|
@ -19,7 +19,6 @@
|
|
|
|
#include "zlib.h"
|
|
|
|
#include "zlib.h"
|
|
|
|
#include "thttp.h"
|
|
|
|
#include "thttp.h"
|
|
|
|
#include "taoserror.h"
|
|
|
|
#include "taoserror.h"
|
|
|
|
#include "tlog.h"
|
|
|
|
|
|
|
|
#include "transComm.h"
|
|
|
|
#include "transComm.h"
|
|
|
|
|
|
|
|
|
|
|
|
// clang-format on
|
|
|
|
// clang-format on
|
|
|
@ -27,14 +26,18 @@
|
|
|
|
#define HTTP_RECV_BUF_SIZE 1024
|
|
|
|
#define HTTP_RECV_BUF_SIZE 1024
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t httpRefMgt = 0;
|
|
|
|
static int32_t httpRefMgt = 0;
|
|
|
|
static int64_t httpRef = -1;
|
|
|
|
|
|
|
|
static int32_t FAST_FAILURE_LIMIT = 1;
|
|
|
|
static int32_t FAST_FAILURE_LIMIT = 1;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int64_t httpDefaultChanId = -1;
|
|
|
|
|
|
|
|
|
|
|
|
typedef struct SHttpModule {
|
|
|
|
typedef struct SHttpModule {
|
|
|
|
uv_loop_t* loop;
|
|
|
|
uv_loop_t* loop;
|
|
|
|
SAsyncPool* asyncPool;
|
|
|
|
SAsyncPool* asyncPool;
|
|
|
|
TdThread thread;
|
|
|
|
TdThread thread;
|
|
|
|
SHashObj* connStatusTable;
|
|
|
|
SHashObj* connStatusTable;
|
|
|
|
|
|
|
|
SHashObj* connPool;
|
|
|
|
int8_t quit;
|
|
|
|
int8_t quit;
|
|
|
|
|
|
|
|
int16_t connNum;
|
|
|
|
} SHttpModule;
|
|
|
|
} SHttpModule;
|
|
|
|
|
|
|
|
|
|
|
|
typedef struct SHttpMsg {
|
|
|
|
typedef struct SHttpMsg {
|
|
|
@ -46,7 +49,7 @@ typedef struct SHttpMsg {
|
|
|
|
int32_t len;
|
|
|
|
int32_t len;
|
|
|
|
EHttpCompFlag flag;
|
|
|
|
EHttpCompFlag flag;
|
|
|
|
int8_t quit;
|
|
|
|
int8_t quit;
|
|
|
|
|
|
|
|
int64_t chanId;
|
|
|
|
} SHttpMsg;
|
|
|
|
} SHttpMsg;
|
|
|
|
|
|
|
|
|
|
|
|
typedef struct SHttpClient {
|
|
|
|
typedef struct SHttpClient {
|
|
|
@ -58,50 +61,73 @@ typedef struct SHttpClient {
|
|
|
|
char* addr;
|
|
|
|
char* addr;
|
|
|
|
uint16_t port;
|
|
|
|
uint16_t port;
|
|
|
|
struct sockaddr_in dest;
|
|
|
|
struct sockaddr_in dest;
|
|
|
|
|
|
|
|
int64_t chanId;
|
|
|
|
} SHttpClient;
|
|
|
|
} SHttpClient;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
typedef struct SHttpConnList {
|
|
|
|
|
|
|
|
queue q;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} SHttpConnList;
|
|
|
|
|
|
|
|
|
|
|
|
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
|
|
|
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
|
|
|
static void transHttpEnvInit();
|
|
|
|
static void transHttpEnvInit();
|
|
|
|
|
|
|
|
|
|
|
|
static void httpHandleReq(SHttpMsg* msg);
|
|
|
|
static void httpHandleReq(SHttpMsg* msg);
|
|
|
|
static void httpHandleQuit(SHttpMsg* msg);
|
|
|
|
static void httpHandleQuit(SHttpMsg* msg);
|
|
|
|
static int32_t httpSendQuit();
|
|
|
|
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
|
|
|
|
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
|
|
|
|
|
|
|
|
static void httpDestroyMsg(SHttpMsg* msg);
|
|
|
|
|
|
|
|
|
|
|
|
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
|
|
|
|
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
|
|
|
|
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
|
|
|
|
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
|
|
|
|
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
EHttpCompFlag flag);
|
|
|
|
EHttpCompFlag flag);
|
|
|
|
|
|
|
|
static void httpModuleDestroy(SHttpModule* http);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
|
|
|
|
|
|
|
int32_t contLen, EHttpCompFlag flag, int64_t chanId);
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
|
|
|
|
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
|
|
|
|
EHttpCompFlag flag) {
|
|
|
|
|
|
|
|
|
|
|
|
EHttpCompFlag flag) {
|
|
|
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
int32_t len = 0;
|
|
|
|
if (flag == HTTP_FLAT) {
|
|
|
|
if (flag == HTTP_FLAT) {
|
|
|
|
return snprintf(pHead, headLen,
|
|
|
|
len = snprintf(pHead, headLen,
|
|
|
|
"POST %s HTTP/1.1\n"
|
|
|
|
"POST %s HTTP/1.1\n"
|
|
|
|
"Host: %s\n"
|
|
|
|
"Host: %s\n"
|
|
|
|
"Content-Type: application/json\n"
|
|
|
|
"Content-Type: application/json\n"
|
|
|
|
"Content-Length: %d\n\n",
|
|
|
|
"Content-Length: %d\n\n",
|
|
|
|
uri, server, contLen);
|
|
|
|
uri, server, contLen);
|
|
|
|
|
|
|
|
if (len < 0 || len >= headLen) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_RANGE;
|
|
|
|
|
|
|
|
}
|
|
|
|
} else if (flag == HTTP_GZIP) {
|
|
|
|
} else if (flag == HTTP_GZIP) {
|
|
|
|
return snprintf(pHead, headLen,
|
|
|
|
len = snprintf(pHead, headLen,
|
|
|
|
"POST %s HTTP/1.1\n"
|
|
|
|
"POST %s HTTP/1.1\n"
|
|
|
|
"Host: %s\n"
|
|
|
|
"Host: %s\n"
|
|
|
|
"Content-Type: application/json\n"
|
|
|
|
"Content-Type: application/json\n"
|
|
|
|
"Content-Encoding: gzip\n"
|
|
|
|
"Content-Encoding: gzip\n"
|
|
|
|
"Content-Length: %d\n\n",
|
|
|
|
"Content-Length: %d\n\n",
|
|
|
|
uri, server, contLen);
|
|
|
|
uri, server, contLen);
|
|
|
|
|
|
|
|
if (len < 0 || len >= headLen) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_RANGE;
|
|
|
|
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
terrno = TSDB_CODE_INVALID_CFG;
|
|
|
|
code = TSDB_CODE_INVALID_PARA;
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return code;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|
|
|
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|
|
|
int32_t code = -1;
|
|
|
|
int32_t code = 0;
|
|
|
|
int32_t destLen = srcLen;
|
|
|
|
int32_t destLen = srcLen;
|
|
|
|
void* pDest = taosMemoryMalloc(destLen);
|
|
|
|
void* pDest = taosMemoryMalloc(destLen);
|
|
|
|
|
|
|
|
|
|
|
|
if (pDest == NULL) {
|
|
|
|
if (pDest == NULL) {
|
|
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
code= TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -110,7 +136,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|
|
|
gzipStream.zfree = (free_func)0;
|
|
|
|
gzipStream.zfree = (free_func)0;
|
|
|
|
gzipStream.opaque = (voidpf)0;
|
|
|
|
gzipStream.opaque = (voidpf)0;
|
|
|
|
if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
|
|
|
if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
|
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -121,13 +147,13 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|
|
|
|
|
|
|
|
|
|
|
while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
|
|
|
|
while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
|
|
|
|
if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
|
|
|
|
if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
|
|
|
|
terrno = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
code = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (gzipStream.avail_in != 0) {
|
|
|
|
if (gzipStream.avail_in != 0) {
|
|
|
|
terrno = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
code = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -137,18 +163,18 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (err != Z_OK) {
|
|
|
|
if (err != Z_OK) {
|
|
|
|
terrno = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
code = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (deflateEnd(&gzipStream) != Z_OK) {
|
|
|
|
if (deflateEnd(&gzipStream) != Z_OK) {
|
|
|
|
terrno = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
code = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (gzipStream.total_out >= srcLen) {
|
|
|
|
if (gzipStream.total_out >= srcLen) {
|
|
|
|
terrno = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
code = TSDB_CODE_COMPRESS_ERROR;
|
|
|
|
goto _OVER;
|
|
|
|
goto _OVER;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -158,8 +184,7 @@ _OVER:
|
|
|
|
if (code == 0) {
|
|
|
|
if (code == 0) {
|
|
|
|
memcpy(pSrc, pDest, gzipStream.total_out);
|
|
|
|
memcpy(pSrc, pDest, gzipStream.total_out);
|
|
|
|
code = gzipStream.total_out;
|
|
|
|
code = gzipStream.total_out;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(pDest);
|
|
|
|
taosMemoryFree(pDest);
|
|
|
|
return code;
|
|
|
|
return code;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -168,11 +193,15 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port,
|
|
|
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
|
|
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
|
|
|
if (ip == 0xffffffff) {
|
|
|
|
if (ip == 0xffffffff) {
|
|
|
|
tError("http-report failed to resolving domain names: %s", server);
|
|
|
|
tError("http-report failed to resolving domain names: %s", server);
|
|
|
|
return -1;
|
|
|
|
return TSDB_CODE_RPC_FQDN_ERROR;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
char buf[128] = {0};
|
|
|
|
char buf[256] = {0};
|
|
|
|
tinet_ntoa(buf, ip);
|
|
|
|
tinet_ntoa(buf, ip);
|
|
|
|
uv_ip4_addr(buf, port, dest);
|
|
|
|
int ret = uv_ip4_addr(buf, port, dest);
|
|
|
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
|
|
|
tError("http-report failed to get addr %s", uv_err_name(ret));
|
|
|
|
|
|
|
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -183,6 +212,44 @@ static void* httpThread(void* arg) {
|
|
|
|
return NULL;
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
|
|
|
|
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
|
|
|
|
|
|
|
|
if (server == NULL || uri == NULL) {
|
|
|
|
|
|
|
|
tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId);
|
|
|
|
|
|
|
|
*httpMsg = NULL;
|
|
|
|
|
|
|
|
return TSDB_CODE_INVALID_PARA;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pCont == NULL || contLen == 0) {
|
|
|
|
|
|
|
|
tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId);
|
|
|
|
|
|
|
|
*httpMsg = NULL;
|
|
|
|
|
|
|
|
return TSDB_CODE_INVALID_PARA;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
|
|
|
|
|
|
|
if (msg == NULL) {
|
|
|
|
|
|
|
|
*httpMsg = NULL;
|
|
|
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg->port = port;
|
|
|
|
|
|
|
|
msg->server = taosStrdup(server);
|
|
|
|
|
|
|
|
msg->uri = taosStrdup(uri);
|
|
|
|
|
|
|
|
msg->cont = taosMemoryMalloc(contLen);
|
|
|
|
|
|
|
|
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
|
|
|
|
|
|
|
|
httpDestroyMsg(msg);
|
|
|
|
|
|
|
|
*httpMsg = NULL;
|
|
|
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
memcpy(msg->cont, pCont, contLen);
|
|
|
|
|
|
|
|
msg->len = contLen;
|
|
|
|
|
|
|
|
msg->flag = flag;
|
|
|
|
|
|
|
|
msg->quit = 0;
|
|
|
|
|
|
|
|
msg->chanId = chanId;
|
|
|
|
|
|
|
|
*httpMsg = msg;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
}
|
|
|
|
static void httpDestroyMsg(SHttpMsg* msg) {
|
|
|
|
static void httpDestroyMsg(SHttpMsg* msg) {
|
|
|
|
if (msg == NULL) return;
|
|
|
|
if (msg == NULL) return;
|
|
|
|
|
|
|
|
|
|
|
@ -191,15 +258,7 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
|
|
|
taosMemoryFree(msg->cont);
|
|
|
|
taosMemoryFree(msg->cont);
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
static void httpDestroyMsgWrapper(void* cont, void* param) {
|
|
|
|
static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); }
|
|
|
|
httpDestroyMsg((SHttpMsg*)cont);
|
|
|
|
|
|
|
|
// if (msg == NULL) return;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// taosMemoryFree(msg->server);
|
|
|
|
|
|
|
|
// taosMemoryFree(msg->uri);
|
|
|
|
|
|
|
|
// taosMemoryFree(msg->cont);
|
|
|
|
|
|
|
|
// taosMemoryFree(msg);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
|
|
|
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
|
|
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
|
|
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
|
|
@ -221,6 +280,7 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
|
|
|
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
|
|
|
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void httpAsyncCb(uv_async_t* handle) {
|
|
|
|
static void httpAsyncCb(uv_async_t* handle) {
|
|
|
|
SAsyncItem* item = handle->data;
|
|
|
|
SAsyncItem* item = handle->data;
|
|
|
|
SHttpModule* http = item->pThrd;
|
|
|
|
SHttpModule* http = item->pThrd;
|
|
|
@ -266,6 +326,14 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
|
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
|
|
|
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
|
|
|
SHttpClient* cli = handle->data;
|
|
|
|
SHttpClient* cli = handle->data;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t chanId = cli->chanId;
|
|
|
|
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, cli->chanId);
|
|
|
|
|
|
|
|
if (http != NULL) {
|
|
|
|
|
|
|
|
http->connNum -= 1;
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -278,7 +346,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested
|
|
|
|
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|
|
|
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|
|
|
SHttpClient* cli = handle->data;
|
|
|
|
SHttpClient* cli = handle->data;
|
|
|
|
if (nread < 0) {
|
|
|
|
if (nread < 0) {
|
|
|
|
tError("http-report recv error:%s", uv_err_name(nread));
|
|
|
|
tError("http-report recv error:%s", uv_strerror(nread));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
|
|
|
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -289,92 +357,73 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
|
|
|
|
static void clientSentCb(uv_write_t* req, int32_t status) {
|
|
|
|
static void clientSentCb(uv_write_t* req, int32_t status) {
|
|
|
|
SHttpClient* cli = req->data;
|
|
|
|
SHttpClient* cli = req->data;
|
|
|
|
if (status != 0) {
|
|
|
|
if (status != 0) {
|
|
|
|
tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
|
|
|
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
|
|
|
|
|
|
|
cli->port, cli->chanId);
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
tTrace("http-report succ to send data");
|
|
|
|
tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
|
|
|
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
|
|
|
if (status != 0) {
|
|
|
|
if (status != 0) {
|
|
|
|
tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
|
|
|
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
|
|
|
|
|
|
|
cli->port, cli->chanId);
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
static void clientConnCb(uv_connect_t* req, int32_t status) {
|
|
|
|
static void clientConnCb(uv_connect_t* req, int32_t status) {
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
SHttpClient* cli = req->data;
|
|
|
|
SHttpClient* cli = req->data;
|
|
|
|
|
|
|
|
int64_t chanId = cli->chanId;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
|
|
|
if (status != 0) {
|
|
|
|
if (status != 0) {
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
|
|
|
|
|
|
|
|
|
|
|
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
|
|
|
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status),
|
|
|
|
|
|
|
|
cli->addr, cli->port, chanId);
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
http->connNum += 1;
|
|
|
|
|
|
|
|
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
|
|
|
|
|
|
|
|
|
|
|
|
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
|
|
|
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
|
|
|
if (0 != status) {
|
|
|
|
if (0 != status) {
|
|
|
|
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
|
|
|
tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
|
|
|
|
|
|
|
cli->port, chanId);
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int32_t httpSendQuit() {
|
|
|
|
int32_t httpSendQuit(SHttpModule* http, int64_t chanId) {
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
if (http == NULL) return 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
|
|
|
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
|
|
|
|
|
|
|
if (msg == NULL) {
|
|
|
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
msg->quit = 1;
|
|
|
|
msg->quit = 1;
|
|
|
|
|
|
|
|
msg->chanId = chanId;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int ret = transAsyncSend(http->asyncPool, &(msg->q));
|
|
|
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
|
|
|
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
transAsyncSend(http->asyncPool, &(msg->q));
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
|
|
|
|
EHttpCompFlag flag) {
|
|
|
|
|
|
|
|
if (server == NULL || uri == NULL) {
|
|
|
|
|
|
|
|
tError("http-report failed to report to invalid addr");
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (pCont == NULL || contLen == 0) {
|
|
|
|
|
|
|
|
tError("http-report failed to report empty packet");
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
if (load == NULL) {
|
|
|
|
|
|
|
|
tError("http-report already released");
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg->server = taosStrdup(server);
|
|
|
|
|
|
|
|
msg->uri = taosStrdup(uri);
|
|
|
|
|
|
|
|
msg->port = port;
|
|
|
|
|
|
|
|
msg->cont = taosMemoryMalloc(contLen);
|
|
|
|
|
|
|
|
memcpy(msg->cont, pCont, contLen);
|
|
|
|
|
|
|
|
msg->len = contLen;
|
|
|
|
|
|
|
|
msg->flag = flag;
|
|
|
|
|
|
|
|
msg->quit = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int ret = transAsyncSend(load->asyncPool, &(msg->q));
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void httpDestroyClientCb(uv_handle_t* handle) {
|
|
|
|
static void httpDestroyClientCb(uv_handle_t* handle) {
|
|
|
|
SHttpClient* http = handle->data;
|
|
|
|
SHttpClient* http = handle->data;
|
|
|
|
destroyHttpClient(http);
|
|
|
|
destroyHttpClient(http);
|
|
|
@ -392,13 +441,14 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
static void httpHandleQuit(SHttpMsg* msg) {
|
|
|
|
static void httpHandleQuit(SHttpMsg* msg) {
|
|
|
|
|
|
|
|
int64_t chanId = msg->chanId;
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
|
|
|
|
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
tDebug("http-report receive quit, chanId:%" PRId64 "", chanId);
|
|
|
|
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
|
|
|
if (http == NULL) return;
|
|
|
|
if (http == NULL) return;
|
|
|
|
|
|
|
|
|
|
|
|
uv_walk(http->loop, httpWalkCb, NULL);
|
|
|
|
uv_walk(http->loop, httpWalkCb, NULL);
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
|
|
|
|
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
|
|
|
@ -431,9 +481,14 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port,
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
static void httpHandleReq(SHttpMsg* msg) {
|
|
|
|
static void httpHandleReq(SHttpMsg* msg) {
|
|
|
|
int32_t ignore = false;
|
|
|
|
int64_t chanId = msg->chanId;
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
int32_t ignore = false;
|
|
|
|
|
|
|
|
char* header = NULL;
|
|
|
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
|
|
|
if (http == NULL) {
|
|
|
|
if (http == NULL) {
|
|
|
|
|
|
|
|
code = terrno;
|
|
|
|
goto END;
|
|
|
|
goto END;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
|
|
|
|
if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
|
|
|
@ -453,123 +508,268 @@ static void httpHandleReq(SHttpMsg* msg) {
|
|
|
|
msg->flag = HTTP_FLAT;
|
|
|
|
msg->flag = HTTP_FLAT;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (dstLen < 0) {
|
|
|
|
if (dstLen < 0) {
|
|
|
|
|
|
|
|
code = dstLen;
|
|
|
|
goto END;
|
|
|
|
goto END;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int32_t len = 2048;
|
|
|
|
int32_t cap = 2048;
|
|
|
|
char* header = taosMemoryCalloc(1, len);
|
|
|
|
header = taosMemoryCalloc(1, cap);
|
|
|
|
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag);
|
|
|
|
if (header == NULL) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto END;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag);
|
|
|
|
if (headLen < 0) {
|
|
|
|
if (headLen < 0) {
|
|
|
|
taosMemoryFree(header);
|
|
|
|
code = headLen;
|
|
|
|
goto END;
|
|
|
|
goto END;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
|
|
|
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
|
|
|
|
|
|
|
if (wb == NULL) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto END;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
|
|
|
|
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
|
|
|
|
wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var
|
|
|
|
wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var
|
|
|
|
|
|
|
|
|
|
|
|
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
|
|
|
|
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
|
|
|
|
|
|
|
|
if (cli == NULL) {
|
|
|
|
|
|
|
|
taosMemoryFree(wb);
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto END;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
cli->conn.data = cli;
|
|
|
|
cli->conn.data = cli;
|
|
|
|
cli->tcp.data = cli;
|
|
|
|
cli->tcp.data = cli;
|
|
|
|
cli->req.data = cli;
|
|
|
|
cli->req.data = cli;
|
|
|
|
cli->wbuf = wb;
|
|
|
|
cli->dest = dest;
|
|
|
|
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
|
|
|
|
cli->chanId = chanId;
|
|
|
|
cli->addr = msg->server;
|
|
|
|
cli->addr = msg->server;
|
|
|
|
cli->port = msg->port;
|
|
|
|
cli->port = msg->port;
|
|
|
|
cli->dest = dest;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(msg->uri);
|
|
|
|
taosMemoryFree(msg->uri);
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
taosMemoryFree(msg);
|
|
|
|
|
|
|
|
|
|
|
|
uv_tcp_init(http->loop, &cli->tcp);
|
|
|
|
cli->wbuf = wb;
|
|
|
|
|
|
|
|
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
|
|
|
|
|
|
|
|
if (cli->rbuf == NULL) {
|
|
|
|
|
|
|
|
tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port,
|
|
|
|
|
|
|
|
chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
|
|
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int err = uv_tcp_init(http->loop, &cli->tcp);
|
|
|
|
|
|
|
|
if (err != 0) {
|
|
|
|
|
|
|
|
tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port,
|
|
|
|
|
|
|
|
chanId, uv_strerror(err));
|
|
|
|
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// set up timeout to avoid stuck;
|
|
|
|
// set up timeout to avoid stuck;
|
|
|
|
int32_t fd = taosCreateSocketWithTimeout(5000);
|
|
|
|
int32_t fd = taosCreateSocketWithTimeout(5000);
|
|
|
|
if (fd < 0) {
|
|
|
|
if (fd < 0) {
|
|
|
|
tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port);
|
|
|
|
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId,
|
|
|
|
|
|
|
|
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
|
|
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
|
|
|
if (ret != 0) {
|
|
|
|
if (ret != 0) {
|
|
|
|
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
|
|
|
tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret),
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
cli->addr, cli->port, chanId, uv_strerror(ret));
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
|
|
|
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
|
|
|
if (ret != 0) {
|
|
|
|
if (ret != 0) {
|
|
|
|
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
|
|
|
|
tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port,
|
|
|
|
cli->port);
|
|
|
|
chanId, uv_strerror(ret));
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
|
|
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
destroyHttpClient(cli);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
|
|
END:
|
|
|
|
END:
|
|
|
|
if (ignore == false) {
|
|
|
|
if (ignore == false) {
|
|
|
|
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
|
|
|
|
tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId,
|
|
|
|
|
|
|
|
tstrerror(code));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
httpDestroyMsg(msg);
|
|
|
|
httpDestroyMsg(msg);
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
taosMemoryFree(header);
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void httpModuleDestroy(SHttpModule* http) {
|
|
|
|
|
|
|
|
if (http == NULL) return;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (http->asyncPool != NULL) {
|
|
|
|
|
|
|
|
TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
|
|
|
|
|
|
|
|
transAsyncPoolDestroy(http->asyncPool);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (http->loop) {
|
|
|
|
|
|
|
|
uv_loop_close(http->loop);
|
|
|
|
|
|
|
|
taosMemoryFree(http->loop);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(http->connStatusTable);
|
|
|
|
|
|
|
|
// not free http, http freeed by ref
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void httpModuleDestroy2(SHttpModule* http) {
|
|
|
|
|
|
|
|
if (http == NULL) return;
|
|
|
|
|
|
|
|
httpModuleDestroy(http);
|
|
|
|
|
|
|
|
taosMemoryFree(http);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
|
|
|
|
|
|
|
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
|
|
|
|
|
|
|
|
SHttpModule* load = NULL;
|
|
|
|
|
|
|
|
SHttpMsg *msg = NULL;
|
|
|
|
|
|
|
|
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg);
|
|
|
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load = taosAcquireRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
if (load == NULL) {
|
|
|
|
|
|
|
|
code = terrno;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (atomic_load_8(&load->quit)) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_HTTP_MODULE_QUIT;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
code = transAsyncSend(load->asyncPool, &(msg->q));
|
|
|
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_HTTP_MODULE_QUIT;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
msg = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_ERROR:
|
|
|
|
|
|
|
|
httpDestroyMsg(msg);
|
|
|
|
|
|
|
|
if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
|
|
|
|
EHttpCompFlag flag, int64_t chanId) {
|
|
|
|
|
|
|
|
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
|
|
|
EHttpCompFlag flag) {
|
|
|
|
EHttpCompFlag flag) {
|
|
|
|
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
|
|
|
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
|
|
|
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
|
|
|
|
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
|
|
|
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t transInitHttpChanImpl();
|
|
|
|
|
|
|
|
|
|
|
|
static void transHttpEnvInit() {
|
|
|
|
static void transHttpEnvInit() {
|
|
|
|
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
|
|
|
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
|
|
|
|
|
|
|
|
httpDefaultChanId = transInitHttpChanImpl();
|
|
|
|
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
|
|
|
|
|
|
|
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
|
|
|
|
|
|
|
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
|
|
|
|
|
|
|
http->quit = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uv_loop_init(http->loop);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
|
|
|
|
|
|
|
if (NULL == http->asyncPool) {
|
|
|
|
|
|
|
|
taosMemoryFree(http->loop);
|
|
|
|
|
|
|
|
taosMemoryFree(http);
|
|
|
|
|
|
|
|
http = NULL;
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
|
|
|
|
|
|
|
if (err != 0) {
|
|
|
|
|
|
|
|
taosMemoryFree(http->loop);
|
|
|
|
|
|
|
|
taosMemoryFree(http);
|
|
|
|
|
|
|
|
http = NULL;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
httpRef = taosAddRef(httpRefMgt, http);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void transHttpEnvDestroy() {
|
|
|
|
void transHttpEnvDestroy() {
|
|
|
|
// remove http
|
|
|
|
// remove default chanId
|
|
|
|
if (httpRef == -1) {
|
|
|
|
taosDestroyHttpChan(httpDefaultChanId);
|
|
|
|
|
|
|
|
httpDefaultChanId = -1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t transInitHttpChanImpl() {
|
|
|
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
|
|
|
|
|
|
|
|
if (http == NULL) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
|
|
|
|
|
|
|
if (http->connStatusTable == NULL) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
|
|
|
|
|
|
|
if (http->loop == NULL) {
|
|
|
|
|
|
|
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int err = uv_loop_init(http->loop);
|
|
|
|
|
|
|
|
if (err != 0) {
|
|
|
|
|
|
|
|
tError("http-report failed init uv, reason:%s", uv_strerror(err));
|
|
|
|
|
|
|
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
|
|
|
|
|
|
|
if (http->asyncPool == NULL) {
|
|
|
|
|
|
|
|
code = terrno;
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
http->quit = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
|
|
|
|
|
|
|
if (err != 0) {
|
|
|
|
|
|
|
|
code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t ref = taosAddRef(httpRefMgt, http);
|
|
|
|
|
|
|
|
if (ref < 0) {
|
|
|
|
|
|
|
|
goto _ERROR;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return ref;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_ERROR:
|
|
|
|
|
|
|
|
httpModuleDestroy2(http);
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
int64_t taosInitHttpChan() {
|
|
|
|
|
|
|
|
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
|
|
|
|
|
|
|
return transInitHttpChanImpl();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void taosDestroyHttpChan(int64_t chanId) {
|
|
|
|
|
|
|
|
tDebug("http-report send quit, chanId:%" PRId64 "", chanId);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int ret = 0;
|
|
|
|
|
|
|
|
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
if (load == NULL) {
|
|
|
|
|
|
|
|
tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno));
|
|
|
|
|
|
|
|
ret = terrno;
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
atomic_store_8(&load->quit, 1);
|
|
|
|
atomic_store_8(&load->quit, 1);
|
|
|
|
httpSendQuit();
|
|
|
|
ret = httpSendQuit(load, chanId);
|
|
|
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
|
|
|
tDebug("http-report already destroyed, chanId %" PRId64 ",reason:%s", chanId, tstrerror(ret));
|
|
|
|
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
taosThreadJoin(load->thread, NULL);
|
|
|
|
taosThreadJoin(load->thread, NULL);
|
|
|
|
|
|
|
|
|
|
|
|
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
|
|
|
|
httpModuleDestroy(load);
|
|
|
|
transAsyncPoolDestroy(load->asyncPool);
|
|
|
|
|
|
|
|
uv_loop_close(load->loop);
|
|
|
|
|
|
|
|
taosMemoryFree(load->loop);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(load->connStatusTable);
|
|
|
|
taosReleaseRef(httpRefMgt, chanId);
|
|
|
|
|
|
|
|
taosRemoveRef(httpRefMgt, chanId);
|
|
|
|
taosReleaseRef(httpRefMgt, httpRef);
|
|
|
|
}
|
|
|
|
taosRemoveRef(httpRefMgt, httpRef);
|
|
|
|
|
|
|
|
}
|
|
|
|
|