diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index a2f6b5ac8b..9f635b8523 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -28,9 +28,11 @@ typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid); int64_t taosInitHttpChan(); int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId); + EHttpCompFlag flag, int64_t chanId, const char* qid); void taosDestroyHttpChan(int64_t chanId); #ifdef __cplusplus diff --git a/source/libs/monitor/src/monFramework.c b/source/libs/monitor/src/monFramework.c index fe32754160..98de5eae57 100644 --- a/source/libs/monitor/src/monFramework.c +++ b/source/libs/monitor/src/monFramework.c @@ -768,9 +768,10 @@ void monSendPromReport() { if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; char tmp[100] = {0}; - (void)sprintf(tmp, "%s?qid=%" PRId64, tsMonFwUri, tGenQid64(tsMonitor.dnodeId)); - uDebug("report cont to %s", tmp); - if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + (void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); } else { (void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT); diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 84d0806c4f..fd70cd29ef 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -602,9 +602,10 @@ void monSendReport(SMonInfo *pMonitor) { if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; char tmp[100] = {0}; - (void)sprintf(tmp, "%s?qid=%" PRId64, tsMonUri, tGenQid64(tsMonitor.dnodeId)); - uDebug("report cont to %s", tmp); - if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + (void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); } taosMemoryFree(pCont); @@ -623,9 +624,10 @@ void monSendReportBasic(SMonInfo *pMonitor) { if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; char tmp[100] = {0}; - (void)sprintf(tmp, "%s?qid=%" PRId64, tsMonFwBasicUri, tGenQid64(tsMonitor.dnodeId)); - uDebug("report cont basic to %s", tmp); - if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + (void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont basic with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); } taosMemoryFree(pCont); @@ -677,10 +679,11 @@ void monSendContent(char *pCont, const char *uri) { } if (pCont != NULL) { char tmp[100] = {0}; - (void)sprintf(tmp, "%s?qid=%" PRId64, uri, tGenQid64(tsMonitor.dnodeId)); - uInfoL("report client cont to %s", tmp); + (void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId)); + uInfoL("report client cont with QID:%s", tmp); EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, tmp) != + 0) { uError("failed to send monitor msg"); } } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index d19877dbf1..3f04d8577c 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -53,6 +53,7 @@ typedef struct SHttpMsg { int8_t quit; int64_t chanId; int64_t seq; + char* qid; } SHttpMsg; typedef struct SHttpClient { @@ -81,7 +82,7 @@ static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); + EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg); static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); @@ -91,31 +92,53 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 static void httpModuleDestroy(SHttpModule* http); static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, - int32_t contLen, EHttpCompFlag flag, int64_t chanId); + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid); -static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, +static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead, + int32_t headLen, EHttpCompFlag flag) { int32_t code = 0; int32_t len = 0; if (flag == HTTP_FLAT) { - len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - uri, server, contLen); + if (qid == NULL) { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, contLen); + } else { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "X-QID: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, qid, contLen); + } if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } } else if (flag == HTTP_GZIP) { - len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Encoding: gzip\n" - "Content-Length: %d\n\n", - uri, server, contLen); + if (qid == NULL) { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, contLen); + } else { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "X-QID: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, qid, contLen); + } if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } @@ -218,7 +241,7 @@ static void* httpThread(void* arg) { } static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { + EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg) { int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1); if (server == NULL || uri == NULL) { tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); @@ -243,6 +266,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, msg->server = taosStrdup(server); msg->uri = taosStrdup(uri); msg->cont = taosMemoryMalloc(contLen); + if (qid != NULL) msg->qid = taosStrdup(qid); if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { httpDestroyMsg(msg); *httpMsg = NULL; @@ -263,6 +287,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->server); taosMemoryFree(msg->uri); taosMemoryFree(msg->cont); + taosMemoryFree(msg->qid); taosMemoryFree(msg); } static void httpDestroyMsgWrapper(void* cont, void* param) { @@ -561,7 +586,7 @@ static void httpHandleReq(SHttpMsg* msg) { goto END; } - int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, msg->qid, header, cap, msg->flag); if (headLen < 0) { code = headLen; goto END; @@ -675,10 +700,10 @@ void httpModuleDestroy2(SHttpModule* http) { } static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, - int32_t contLen, EHttpCompFlag flag, int64_t chanId) { + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) { SHttpModule* load = NULL; SHttpMsg* msg = NULL; - int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg); + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg); if (code != 0) { goto _ERROR; } @@ -714,14 +739,19 @@ _ERROR: } int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId) { - return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId); + EHttpCompFlag flag, int64_t chanId, const char* qid) { + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId, qid); } int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + return taosSendHttpReportWithQID(server, uri, port, pCont, contLen, flag, NULL); +} + +int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid) { (void)taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId); + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }