fix compile error

This commit is contained in:
Yihao Deng 2024-07-12 02:54:59 +00:00
parent 1b052eefd0
commit d5eb7202f3
1 changed files with 64 additions and 51 deletions

View File

@ -76,8 +76,8 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId); EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
static void httpDestroyMsg(SHttpMsg* msg); static void httpDestroyMsg(SHttpMsg* msg);
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
@ -90,35 +90,44 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
int32_t contLen, EHttpCompFlag flag, int64_t chanId); int32_t contLen, EHttpCompFlag flag, int64_t chanId);
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
EHttpCompFlag flag) {
int32_t code = 0;
int32_t len = 0;
if (flag == HTTP_FLAT) { if (flag == HTTP_FLAT) {
return snprintf(pHead, headLen, len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n" "POST %s HTTP/1.1\n"
"Host: %s\n" "Host: %s\n"
"Content-Type: application/json\n" "Content-Type: application/json\n"
"Content-Length: %d\n\n", "Content-Length: %d\n\n",
uri, server, contLen); uri, server, contLen);
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
} else if (flag == HTTP_GZIP) { } else if (flag == HTTP_GZIP) {
return snprintf(pHead, headLen, len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n" "POST %s HTTP/1.1\n"
"Host: %s\n" "Host: %s\n"
"Content-Type: application/json\n" "Content-Type: application/json\n"
"Content-Encoding: gzip\n" "Content-Encoding: gzip\n"
"Content-Length: %d\n\n", "Content-Length: %d\n\n",
uri, server, contLen); uri, server, contLen);
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
} else { } else {
terrno = TSDB_CODE_INVALID_CFG; return TSDB_CODE_INVALID_PARA;
return terrno;
} }
return 0;
} }
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
int32_t code = -1; int32_t code = 0;
int32_t destLen = srcLen; int32_t destLen = srcLen;
void* pDest = taosMemoryMalloc(destLen); void* pDest = taosMemoryMalloc(destLen);
if (pDest == NULL) { if (pDest == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code= TSDB_CODE_OUT_OF_MEMORY;
goto _OVER; goto _OVER;
} }
@ -127,7 +136,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
gzipStream.zfree = (free_func)0; gzipStream.zfree = (free_func)0;
gzipStream.opaque = (voidpf)0; gzipStream.opaque = (voidpf)0;
if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER; goto _OVER;
} }
@ -138,13 +147,13 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) { while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) { if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
terrno = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _OVER; goto _OVER;
} }
} }
if (gzipStream.avail_in != 0) { if (gzipStream.avail_in != 0) {
terrno = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _OVER; goto _OVER;
} }
@ -154,18 +163,18 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
break; break;
} }
if (err != Z_OK) { if (err != Z_OK) {
terrno = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _OVER; goto _OVER;
} }
} }
if (deflateEnd(&gzipStream) != Z_OK) { if (deflateEnd(&gzipStream) != Z_OK) {
terrno = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _OVER; goto _OVER;
} }
if (gzipStream.total_out >= srcLen) { if (gzipStream.total_out >= srcLen) {
terrno = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _OVER; goto _OVER;
} }
@ -175,10 +184,7 @@ _OVER:
if (code == 0) { if (code == 0) {
memcpy(pSrc, pDest, gzipStream.total_out); memcpy(pSrc, pDest, gzipStream.total_out);
code = gzipStream.total_out; code = gzipStream.total_out;
} else { }
code = terrno;
}
taosMemoryFree(pDest); taosMemoryFree(pDest);
return code; return code;
} }
@ -206,24 +212,24 @@ static void* httpThread(void* arg) {
return NULL; return NULL;
} }
static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId) { EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
if (server == NULL || uri == NULL) { if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId);
terrno = TSDB_CODE_INVALID_PARA; *httpMsg = NULL;
return NULL; return TSDB_CODE_INVALID_PARA;
} }
if (pCont == NULL || contLen == 0) { if (pCont == NULL || contLen == 0) {
tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId);
terrno = TSDB_CODE_INVALID_PARA; *httpMsg = NULL;
return NULL; return TSDB_CODE_INVALID_PARA;
} }
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
if (msg == NULL) { if (msg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; *httpMsg = NULL;
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
msg->port = port; msg->port = port;
@ -232,8 +238,8 @@ static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t por
msg->cont = taosMemoryMalloc(contLen); msg->cont = taosMemoryMalloc(contLen);
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
httpDestroyMsg(msg); httpDestroyMsg(msg);
terrno = TSDB_CODE_OUT_OF_MEMORY; *httpMsg = NULL;
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
memcpy(msg->cont, pCont, contLen); memcpy(msg->cont, pCont, contLen);
@ -241,7 +247,8 @@ static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t por
msg->flag = flag; msg->flag = flag;
msg->quit = 0; msg->quit = 0;
msg->chanId = chanId; msg->chanId = chanId;
return msg; *httpMsg = msg;
return 0;
} }
static void httpDestroyMsg(SHttpMsg* msg) { static void httpDestroyMsg(SHttpMsg* msg) {
if (msg == NULL) return; if (msg == NULL) return;
@ -477,10 +484,11 @@ static void httpHandleReq(SHttpMsg* msg) {
int64_t chanId = msg->chanId; int64_t chanId = msg->chanId;
int32_t ignore = false; int32_t ignore = false;
char* header = NULL; char* header = NULL;
terrno = 0; int32_t code = 0;
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (http == NULL) { if (http == NULL) {
code = terrno;
goto END; goto END;
} }
if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) {
@ -500,6 +508,7 @@ static void httpHandleReq(SHttpMsg* msg) {
msg->flag = HTTP_FLAT; msg->flag = HTTP_FLAT;
} }
if (dstLen < 0) { if (dstLen < 0) {
code = dstLen;
goto END; goto END;
} }
} }
@ -507,18 +516,19 @@ static void httpHandleReq(SHttpMsg* msg) {
int32_t cap = 2048; int32_t cap = 2048;
header = taosMemoryCalloc(1, cap); header = taosMemoryCalloc(1, cap);
if (header == NULL) { if (header == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END; goto END;
} }
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag);
if (headLen < 0 || headLen >= cap) { if (headLen < 0) {
code = headLen;
goto END; goto END;
} }
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
if (wb == NULL) { if (wb == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END; goto END;
} }
@ -528,7 +538,7 @@ static void httpHandleReq(SHttpMsg* msg) {
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
if (cli == NULL) { if (cli == NULL) {
taosMemoryFree(wb); taosMemoryFree(wb);
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END; goto END;
} }
@ -593,9 +603,8 @@ static void httpHandleReq(SHttpMsg* msg) {
END: END:
if (ignore == false) { if (ignore == false) {
tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId,
tstrerror(terrno)); tstrerror(code));
} }
terrno = 0;
httpDestroyMsg(msg); httpDestroyMsg(msg);
taosMemoryFree(header); taosMemoryFree(header);
taosReleaseRef(httpRefMgt, chanId); taosReleaseRef(httpRefMgt, chanId);
@ -625,26 +634,27 @@ void httpModuleDestroy2(SHttpModule* http) {
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId) { int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
int32_t ret = 0;
SHttpModule* load = NULL; SHttpModule* load = NULL;
SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); SHttpMsg *msg = NULL;
if (msg == NULL) { int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg);
if (code != 0) {
goto _ERROR; goto _ERROR;
} }
load = taosAcquireRef(httpRefMgt, chanId); load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) { if (load == NULL) {
code = terrno;
goto _ERROR; goto _ERROR;
} }
if (atomic_load_8(&load->quit)) { if (atomic_load_8(&load->quit)) {
terrno = TSDB_CODE_HTTP_MODULE_QUIT; code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR; goto _ERROR;
} }
ret = transAsyncSend(load->asyncPool, &(msg->q)); code = transAsyncSend(load->asyncPool, &(msg->q));
if (ret < 0) { if (code != 0) {
terrno = TSDB_CODE_HTTP_MODULE_QUIT; code = TSDB_CODE_HTTP_MODULE_QUIT;
goto _ERROR; goto _ERROR;
} }
msg = NULL; msg = NULL;
@ -652,7 +662,7 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
_ERROR: _ERROR:
httpDestroyMsg(msg); httpDestroyMsg(msg);
if (load != NULL) taosReleaseRef(httpRefMgt, chanId); if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
return ret = terrno; return code;
} }
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
@ -682,33 +692,35 @@ void transHttpEnvDestroy() {
} }
int64_t transInitHttpChanImpl() { int64_t transInitHttpChanImpl() {
int32_t code = 0;
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
if (http == NULL) { if (http == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR; goto _ERROR;
} }
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (http->connStatusTable == NULL) { if (http->connStatusTable == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR; goto _ERROR;
} }
http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (http->loop == NULL) { if (http->loop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR; goto _ERROR;
} }
int err = uv_loop_init(http->loop); int err = uv_loop_init(http->loop);
if (err != 0) { if (err != 0) {
tError("http-report failed init uv, reason:%s", uv_strerror(err)); tError("http-report failed init uv, reason:%s", uv_strerror(err));
terrno = TSDB_CODE_THIRDPARTY_ERROR; code = TSDB_CODE_THIRDPARTY_ERROR;
goto _ERROR; goto _ERROR;
} }
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (http->asyncPool == NULL) { if (http->asyncPool == NULL) {
code = terrno;
goto _ERROR; goto _ERROR;
} }
@ -716,7 +728,7 @@ int64_t transInitHttpChanImpl() {
err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) { if (err != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _ERROR; goto _ERROR;
} }
@ -728,7 +740,7 @@ int64_t transInitHttpChanImpl() {
_ERROR: _ERROR:
httpModuleDestroy2(http); httpModuleDestroy2(http);
return terrno; return code;
} }
int64_t taosInitHttpChan() { int64_t taosInitHttpChan() {
taosThreadOnce(&transHttpInit, transHttpEnvInit); taosThreadOnce(&transHttpInit, transHttpEnvInit);
@ -742,6 +754,7 @@ void taosDestroyHttpChan(int64_t chanId) {
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) { if (load == NULL) {
tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno)); tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno));
ret = terrno;
return; return;
} }