ehn/TD-31977-move-qid-to-header

This commit is contained in:
dmchen 2024-09-10 09:49:36 +00:00
parent f01d166b3c
commit c6d8bd7333
4 changed files with 72 additions and 36 deletions

View File

@ -28,9 +28,11 @@ typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid);
int64_t taosInitHttpChan();
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId);
EHttpCompFlag flag, int64_t chanId, const char* qid);
void taosDestroyHttpChan(int64_t chanId);
#ifdef __cplusplus

View File

@ -768,9 +768,10 @@ void monSendPromReport() {
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
char tmp[100] = {0};
(void)sprintf(tmp, "%s?qid=%" PRId64, tsMonFwUri, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont to %s", tmp);
if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
(void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont with QID:%s", tmp);
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
tmp) != 0) {
uError("failed to send monitor msg");
} else {
(void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT);

View File

@ -602,9 +602,10 @@ void monSendReport(SMonInfo *pMonitor) {
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
char tmp[100] = {0};
(void)sprintf(tmp, "%s?qid=%" PRId64, tsMonUri, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont to %s", tmp);
if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
(void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont with QID:%s", tmp);
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
tmp) != 0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
@ -623,9 +624,10 @@ void monSendReportBasic(SMonInfo *pMonitor) {
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
char tmp[100] = {0};
(void)sprintf(tmp, "%s?qid=%" PRId64, tsMonFwBasicUri, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont basic to %s", tmp);
if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
(void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId));
uDebug("report cont basic with QID:%s", tmp);
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
tmp) != 0) {
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
@ -677,10 +679,11 @@ void monSendContent(char *pCont, const char *uri) {
}
if (pCont != NULL) {
char tmp[100] = {0};
(void)sprintf(tmp, "%s?qid=%" PRId64, uri, tGenQid64(tsMonitor.dnodeId));
uInfoL("report client cont to %s", tmp);
(void)sprintf(tmp, "%" PRId64, tGenQid64(tsMonitor.dnodeId));
uInfoL("report client cont with QID:%s", tmp);
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tmp, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, tmp) !=
0) {
uError("failed to send monitor msg");
}
}

View File

@ -53,6 +53,7 @@ typedef struct SHttpMsg {
int8_t quit;
int64_t chanId;
int64_t seq;
char* qid;
} SHttpMsg;
typedef struct SHttpClient {
@ -81,7 +82,7 @@ static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg);
static void httpDestroyMsg(SHttpMsg* msg);
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
@ -91,31 +92,53 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1
static void httpModuleDestroy(SHttpModule* http);
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId);
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid);
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead,
int32_t headLen,
EHttpCompFlag flag) {
int32_t code = 0;
int32_t len = 0;
if (flag == HTTP_FLAT) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
uri, server, contLen);
if (qid == NULL) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
uri, server, contLen);
} else {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"X-QID: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
uri, server, qid, contLen);
}
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
} else if (flag == HTTP_GZIP) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
uri, server, contLen);
if (qid == NULL) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
uri, server, contLen);
} else {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"X-QID: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
uri, server, qid, contLen);
}
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
@ -218,7 +241,7 @@ static void* httpThread(void* arg) {
}
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg) {
int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
@ -243,6 +266,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port,
msg->server = taosStrdup(server);
msg->uri = taosStrdup(uri);
msg->cont = taosMemoryMalloc(contLen);
if (qid != NULL) msg->qid = taosStrdup(qid);
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
httpDestroyMsg(msg);
*httpMsg = NULL;
@ -263,6 +287,7 @@ static void httpDestroyMsg(SHttpMsg* msg) {
taosMemoryFree(msg->server);
taosMemoryFree(msg->uri);
taosMemoryFree(msg->cont);
taosMemoryFree(msg->qid);
taosMemoryFree(msg);
}
static void httpDestroyMsgWrapper(void* cont, void* param) {
@ -561,7 +586,7 @@ static void httpHandleReq(SHttpMsg* msg) {
goto END;
}
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, msg->qid, header, cap, msg->flag);
if (headLen < 0) {
code = headLen;
goto END;
@ -675,10 +700,10 @@ void httpModuleDestroy2(SHttpModule* http) {
}
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) {
SHttpModule* load = NULL;
SHttpMsg* msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg);
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
if (code != 0) {
goto _ERROR;
}
@ -714,14 +739,19 @@ _ERROR:
}
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId) {
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId);
EHttpCompFlag flag, int64_t chanId, const char* qid) {
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId, qid);
}
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
return taosSendHttpReportWithQID(server, uri, port, pCont, contLen, flag, NULL);
}
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, const char* qid) {
(void)taosThreadOnce(&transHttpInit, transHttpEnvInit);
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId);
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid);
}
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }