Merge pull request #27765 from taosdata/enh/TD-31977-monitor-qid
enh/TD-31977-monitor-qid
This commit is contained in:
commit
6f43830bb2
|
@ -49,6 +49,7 @@ typedef struct {
|
||||||
} SAuditRecord;
|
} SAuditRecord;
|
||||||
|
|
||||||
int32_t auditInit(const SAuditCfg *pCfg);
|
int32_t auditInit(const SAuditCfg *pCfg);
|
||||||
|
void auditSetDnodeId(int32_t dnodeId);
|
||||||
void auditCleanup();
|
void auditCleanup();
|
||||||
int32_t auditSend(SJson *pJson);
|
int32_t auditSend(SJson *pJson);
|
||||||
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
|
||||||
|
|
|
@ -218,6 +218,7 @@ typedef struct {
|
||||||
} SDmNotifyHandle;
|
} SDmNotifyHandle;
|
||||||
|
|
||||||
int32_t monInit(const SMonCfg *pCfg);
|
int32_t monInit(const SMonCfg *pCfg);
|
||||||
|
void monSetDnodeId(int32_t dnodeId);
|
||||||
void monInitVnode();
|
void monInitVnode();
|
||||||
void monCleanup();
|
void monCleanup();
|
||||||
void monRecordLog(int64_t ts, ELogLevel level, const char *content);
|
void monRecordLog(int64_t ts, ELogLevel level, const char *content);
|
||||||
|
|
|
@ -28,9 +28,11 @@ typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
|
||||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag);
|
EHttpCompFlag flag);
|
||||||
|
|
||||||
|
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
|
EHttpCompFlag flag, const char* qid);
|
||||||
int64_t taosInitHttpChan();
|
int64_t taosInitHttpChan();
|
||||||
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag, int64_t chanId);
|
EHttpCompFlag flag, int64_t chanId, const char* qid);
|
||||||
void taosDestroyHttpChan(int64_t chanId);
|
void taosDestroyHttpChan(int64_t chanId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -37,3 +37,14 @@ int32_t tGenIdPI32(void);
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int64_t tGenIdPI64(void);
|
int64_t tGenIdPI64(void);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate an qid
|
||||||
|
*+------------+-----+-----------+---------------+
|
||||||
|
*| nodeid| 0| serial number | 0 |
|
||||||
|
*+------------+-----+-----------+---------------+
|
||||||
|
*| 8bit | 16bit|32bit |8bit |
|
||||||
|
*+------------+-----+-----------+---------------+
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int64_t tGenQid64(int8_t dnodeId);
|
||||||
|
|
|
@ -14,7 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "audit.h"
|
||||||
#include "dmInt.h"
|
#include "dmInt.h"
|
||||||
|
#include "monitor.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
|
||||||
|
@ -27,6 +29,8 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
|
||||||
pMgmt->pData->dnodeId = pCfg->dnodeId;
|
pMgmt->pData->dnodeId = pCfg->dnodeId;
|
||||||
pMgmt->pData->clusterId = pCfg->clusterId;
|
pMgmt->pData->clusterId = pCfg->clusterId;
|
||||||
|
monSetDnodeId(pCfg->dnodeId);
|
||||||
|
auditSetDnodeId(pCfg->dnodeId);
|
||||||
code = dmWriteEps(pMgmt->pData);
|
code = dmWriteEps(pMgmt->pData);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
|
dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
|
||||||
|
|
|
@ -15,6 +15,9 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
#include "dmUtil.h"
|
||||||
|
#include "monitor.h"
|
||||||
|
#include "audit.h"
|
||||||
|
|
||||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -98,6 +101,9 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auditSetDnodeId(dmGetDnodeId(&pDnode->data));
|
||||||
|
monSetDnodeId(dmGetDnodeId(&pDnode->data));
|
||||||
|
|
||||||
dmSetStatus(pDnode, DND_STAT_RUNNING);
|
dmSetStatus(pDnode, DND_STAT_RUNNING);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,6 +217,7 @@ int32_t dmInitDndInfo(SDnodeData *pData);
|
||||||
|
|
||||||
// dmEps.c
|
// dmEps.c
|
||||||
int32_t dmGetDnodeSize(SDnodeData *pData);
|
int32_t dmGetDnodeSize(SDnodeData *pData);
|
||||||
|
int32_t dmGetDnodeId(SDnodeData *pData);
|
||||||
int32_t dmReadEps(SDnodeData *pData);
|
int32_t dmReadEps(SDnodeData *pData);
|
||||||
int32_t dmWriteEps(SDnodeData *pData);
|
int32_t dmWriteEps(SDnodeData *pData);
|
||||||
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
||||||
|
|
|
@ -88,3 +88,5 @@ void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmGetDnodeId(SDnodeData *pData) { return pData->dnodeId; }
|
|
@ -23,6 +23,7 @@ typedef struct {
|
||||||
SAuditCfg cfg;
|
SAuditCfg cfg;
|
||||||
SArray *records;
|
SArray *records;
|
||||||
TdThreadMutex lock;
|
TdThreadMutex lock;
|
||||||
|
int32_t dnodeId;
|
||||||
} SAudit;
|
} SAudit;
|
||||||
|
|
||||||
#endif /*_TD_AUDIT_INT_H_*/
|
#endif /*_TD_AUDIT_INT_H_*/
|
||||||
|
|
|
@ -36,6 +36,8 @@ int32_t auditInit(const SAuditCfg *pCfg) {
|
||||||
return taosThreadMutexInit(&tsAudit.lock, NULL);
|
return taosThreadMutexInit(&tsAudit.lock, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void auditSetDnodeId(int32_t dnodeId) { tsAudit.dnodeId = dnodeId; }
|
||||||
|
|
||||||
static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) {
|
static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) {
|
||||||
if (record) {
|
if (record) {
|
||||||
taosMemoryFree(record->detail);
|
taosMemoryFree(record->detail);
|
||||||
|
|
|
@ -47,6 +47,7 @@ typedef struct {
|
||||||
SMonQmInfo qmInfo;
|
SMonQmInfo qmInfo;
|
||||||
SMonBmInfo bmInfo;
|
SMonBmInfo bmInfo;
|
||||||
SHashObj *metrics;
|
SHashObj *metrics;
|
||||||
|
int32_t dnodeId;
|
||||||
} SMonitor;
|
} SMonitor;
|
||||||
|
|
||||||
void monGenClusterInfoTable(SMonInfo *pMonitor);
|
void monGenClusterInfoTable(SMonInfo *pMonitor);
|
||||||
|
|
|
@ -767,9 +767,13 @@ void monSendPromReport() {
|
||||||
}
|
}
|
||||||
if (pCont != NULL) {
|
if (pCont != NULL) {
|
||||||
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||||
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
|
char tmp[100] = {0};
|
||||||
|
(void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId));
|
||||||
|
uDebug("report cont with QID:%s", tmp);
|
||||||
|
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
|
||||||
|
tmp) != 0) {
|
||||||
uError("failed to send monitor msg");
|
uError("failed to send monitor msg");
|
||||||
}else{
|
} else {
|
||||||
(void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT);
|
(void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pCont);
|
taosMemoryFreeClear(pCont);
|
||||||
|
|
|
@ -131,6 +131,8 @@ int32_t monInit(const SMonCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void monSetDnodeId(int32_t dnodeId) { tsMonitor.dnodeId = dnodeId; }
|
||||||
|
|
||||||
void monInitVnode() {
|
void monInitVnode() {
|
||||||
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
||||||
if (tsInsertCounter == NULL) {
|
if (tsInsertCounter == NULL) {
|
||||||
|
@ -599,7 +601,11 @@ void monSendReport(SMonInfo *pMonitor) {
|
||||||
}
|
}
|
||||||
if (pCont != NULL) {
|
if (pCont != NULL) {
|
||||||
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||||
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
|
char tmp[100] = {0};
|
||||||
|
(void)snprintf(tmp, 100, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId));
|
||||||
|
uDebug("report cont with QID:%s", tmp);
|
||||||
|
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
|
||||||
|
tmp) != 0) {
|
||||||
uError("failed to send monitor msg");
|
uError("failed to send monitor msg");
|
||||||
}
|
}
|
||||||
taosMemoryFree(pCont);
|
taosMemoryFree(pCont);
|
||||||
|
@ -617,8 +623,11 @@ void monSendReportBasic(SMonInfo *pMonitor) {
|
||||||
}
|
}
|
||||||
if (pCont != NULL) {
|
if (pCont != NULL) {
|
||||||
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||||
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) !=
|
char tmp[100] = {0};
|
||||||
0) {
|
(void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId));
|
||||||
|
uDebug("report cont basic with QID:%s", tmp);
|
||||||
|
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag,
|
||||||
|
tmp) != 0) {
|
||||||
uError("failed to send monitor msg");
|
uError("failed to send monitor msg");
|
||||||
}
|
}
|
||||||
taosMemoryFree(pCont);
|
taosMemoryFree(pCont);
|
||||||
|
@ -669,8 +678,12 @@ void monSendContent(char *pCont, const char *uri) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pCont != NULL) {
|
if (pCont != NULL) {
|
||||||
|
char tmp[100] = {0};
|
||||||
|
(void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId));
|
||||||
|
uInfoL("report client cont with QID:%s", tmp);
|
||||||
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||||
if (taosSendHttpReport(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
|
if (taosSendHttpReportWithQID(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, tmp) !=
|
||||||
|
0) {
|
||||||
uError("failed to send monitor msg");
|
uError("failed to send monitor msg");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ typedef struct SHttpMsg {
|
||||||
int8_t quit;
|
int8_t quit;
|
||||||
int64_t chanId;
|
int64_t chanId;
|
||||||
int64_t seq;
|
int64_t seq;
|
||||||
|
char* qid;
|
||||||
} SHttpMsg;
|
} SHttpMsg;
|
||||||
|
|
||||||
typedef struct SHttpClient {
|
typedef struct SHttpClient {
|
||||||
|
@ -81,7 +82,7 @@ static void httpHandleQuit(SHttpMsg* msg);
|
||||||
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
|
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
|
||||||
|
|
||||||
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
|
EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg);
|
||||||
static void httpDestroyMsg(SHttpMsg* msg);
|
static void httpDestroyMsg(SHttpMsg* msg);
|
||||||
|
|
||||||
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
|
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
|
||||||
|
@ -91,24 +92,36 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1
|
||||||
static void httpModuleDestroy(SHttpModule* http);
|
static void httpModuleDestroy(SHttpModule* http);
|
||||||
|
|
||||||
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
||||||
int32_t contLen, EHttpCompFlag flag, int64_t chanId);
|
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid);
|
||||||
|
|
||||||
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
|
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead,
|
||||||
|
int32_t headLen,
|
||||||
|
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
if (flag == HTTP_FLAT) {
|
if (flag == HTTP_FLAT) {
|
||||||
|
if (qid == NULL) {
|
||||||
len = snprintf(pHead, headLen,
|
len = snprintf(pHead, headLen,
|
||||||
"POST %s HTTP/1.1\n"
|
"POST %s HTTP/1.1\n"
|
||||||
"Host: %s\n"
|
"Host: %s\n"
|
||||||
"Content-Type: application/json\n"
|
"Content-Type: application/json\n"
|
||||||
"Content-Length: %d\n\n",
|
"Content-Length: %d\n\n",
|
||||||
uri, server, contLen);
|
uri, server, contLen);
|
||||||
|
} else {
|
||||||
|
len = snprintf(pHead, headLen,
|
||||||
|
"POST %s HTTP/1.1\n"
|
||||||
|
"Host: %s\n"
|
||||||
|
"X-QID: %s\n"
|
||||||
|
"Content-Type: application/json\n"
|
||||||
|
"Content-Length: %d\n\n",
|
||||||
|
uri, server, qid, contLen);
|
||||||
|
}
|
||||||
if (len < 0 || len >= headLen) {
|
if (len < 0 || len >= headLen) {
|
||||||
code = TSDB_CODE_OUT_OF_RANGE;
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
} else if (flag == HTTP_GZIP) {
|
} else if (flag == HTTP_GZIP) {
|
||||||
|
if (qid == NULL) {
|
||||||
len = snprintf(pHead, headLen,
|
len = snprintf(pHead, headLen,
|
||||||
"POST %s HTTP/1.1\n"
|
"POST %s HTTP/1.1\n"
|
||||||
"Host: %s\n"
|
"Host: %s\n"
|
||||||
|
@ -116,6 +129,16 @@ static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t
|
||||||
"Content-Encoding: gzip\n"
|
"Content-Encoding: gzip\n"
|
||||||
"Content-Length: %d\n\n",
|
"Content-Length: %d\n\n",
|
||||||
uri, server, contLen);
|
uri, server, contLen);
|
||||||
|
} else {
|
||||||
|
len = snprintf(pHead, headLen,
|
||||||
|
"POST %s HTTP/1.1\n"
|
||||||
|
"Host: %s\n"
|
||||||
|
"X-QID: %s\n"
|
||||||
|
"Content-Type: application/json\n"
|
||||||
|
"Content-Encoding: gzip\n"
|
||||||
|
"Content-Length: %d\n\n",
|
||||||
|
uri, server, qid, contLen);
|
||||||
|
}
|
||||||
if (len < 0 || len >= headLen) {
|
if (len < 0 || len >= headLen) {
|
||||||
code = TSDB_CODE_OUT_OF_RANGE;
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
@ -218,7 +241,7 @@ static void* httpThread(void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
|
EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg) {
|
||||||
int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
|
int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
|
||||||
if (server == NULL || uri == NULL) {
|
if (server == NULL || uri == NULL) {
|
||||||
tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
|
tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
|
||||||
|
@ -243,6 +266,10 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port,
|
||||||
msg->server = taosStrdup(server);
|
msg->server = taosStrdup(server);
|
||||||
msg->uri = taosStrdup(uri);
|
msg->uri = taosStrdup(uri);
|
||||||
msg->cont = taosMemoryMalloc(contLen);
|
msg->cont = taosMemoryMalloc(contLen);
|
||||||
|
if (qid != NULL)
|
||||||
|
msg->qid = taosStrdup(qid);
|
||||||
|
else
|
||||||
|
msg->qid = NULL;
|
||||||
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
|
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
*httpMsg = NULL;
|
*httpMsg = NULL;
|
||||||
|
@ -263,6 +290,7 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
||||||
taosMemoryFree(msg->server);
|
taosMemoryFree(msg->server);
|
||||||
taosMemoryFree(msg->uri);
|
taosMemoryFree(msg->uri);
|
||||||
taosMemoryFree(msg->cont);
|
taosMemoryFree(msg->cont);
|
||||||
|
if (msg->qid != NULL) taosMemoryFree(msg->qid);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
}
|
}
|
||||||
static void httpDestroyMsgWrapper(void* cont, void* param) {
|
static void httpDestroyMsgWrapper(void* cont, void* param) {
|
||||||
|
@ -561,7 +589,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag);
|
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, msg->qid, header, cap, msg->flag);
|
||||||
if (headLen < 0) {
|
if (headLen < 0) {
|
||||||
code = headLen;
|
code = headLen;
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -590,6 +618,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
cli->chanId = chanId;
|
cli->chanId = chanId;
|
||||||
cli->addr = msg->server;
|
cli->addr = msg->server;
|
||||||
cli->port = msg->port;
|
cli->port = msg->port;
|
||||||
|
if (msg->qid != NULL) taosMemoryFree(msg->qid);
|
||||||
taosMemoryFree(msg->uri);
|
taosMemoryFree(msg->uri);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
|
||||||
|
@ -675,10 +704,10 @@ void httpModuleDestroy2(SHttpModule* http) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
||||||
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
|
int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) {
|
||||||
SHttpModule* load = NULL;
|
SHttpModule* load = NULL;
|
||||||
SHttpMsg* msg = NULL;
|
SHttpMsg* msg = NULL;
|
||||||
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg);
|
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
@ -714,14 +743,19 @@ _ERROR:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag, int64_t chanId) {
|
EHttpCompFlag flag, int64_t chanId, const char* qid) {
|
||||||
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId);
|
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId, qid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
|
return taosSendHttpReportWithQID(server, uri, port, pCont, contLen, flag, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
|
EHttpCompFlag flag, const char* qid) {
|
||||||
(void)taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
(void)taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
||||||
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId);
|
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
||||||
|
|
|
@ -65,3 +65,18 @@ int64_t tGenIdPI64(void) {
|
||||||
|
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tGenQid64(int8_t dnodeId) {
|
||||||
|
int64_t id = dnodeId;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1);
|
||||||
|
|
||||||
|
id = (id << 56) | (val & 0xFFFFF) << 8;
|
||||||
|
if (id) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
Loading…
Reference in New Issue