diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 4fa69f1b4f..2e786ab2b3 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -49,6 +49,7 @@ typedef struct { } SAuditRecord; int32_t auditInit(const SAuditCfg *pCfg); +void auditSetDnodeId(int32_t dnodeId); void auditCleanup(); int32_t auditSend(SJson *pJson); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 96ac53ef4e..636e7a3143 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -218,6 +218,7 @@ typedef struct { } SDmNotifyHandle; int32_t monInit(const SMonCfg *pCfg); +void monSetDnodeId(int32_t dnodeId); void monInitVnode(); void monCleanup(); void monRecordLog(int64_t ts, ELogLevel level, const char *content); diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index a2f6b5ac8b..9f635b8523 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -28,9 +28,11 @@ typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid); int64_t taosInitHttpChan(); int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId); + EHttpCompFlag flag, int64_t chanId, const char* qid); void taosDestroyHttpChan(int64_t chanId); #ifdef __cplusplus diff --git a/include/util/tuuid.h b/include/util/tuuid.h index 315c2ad497..a4e1059371 100644 --- a/include/util/tuuid.h +++ b/include/util/tuuid.h @@ -37,3 +37,14 @@ int32_t tGenIdPI32(void); * @return */ int64_t tGenIdPI64(void); + +/** + * Generate an qid + *+------------+-----+-----------+---------------+ + *| nodeid| 0| serial number | 0 | + *+------------+-----+-----------+---------------+ + *| 8bit | 16bit|32bit |8bit | + *+------------+-----+-----------+---------------+ + * @return + */ +int64_t tGenQid64(int8_t dnodeId); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f9ae94c53f..ae12bf6c99 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -14,7 +14,9 @@ */ #define _DEFAULT_SOURCE +#include "audit.h" #include "dmInt.h" +#include "monitor.h" #include "systable.h" #include "tchecksum.h" @@ -27,6 +29,8 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { (void)taosThreadRwlockWrlock(&pMgmt->pData->lock); pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->clusterId = pCfg->clusterId; + monSetDnodeId(pCfg->dnodeId); + auditSetDnodeId(pCfg->dnodeId); code = dmWriteEps(pMgmt->pData); if (code != 0) { dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId, diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index f67901d6d5..3cb9030f60 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -15,6 +15,9 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" +#include "dmUtil.h" +#include "monitor.h" +#include "audit.h" int32_t dmOpenNode(SMgmtWrapper *pWrapper) { int32_t code = 0; @@ -98,6 +101,9 @@ static int32_t dmOpenNodes(SDnode *pDnode) { } } + auditSetDnodeId(dmGetDnodeId(&pDnode->data)); + monSetDnodeId(dmGetDnodeId(&pDnode->data)); + dmSetStatus(pDnode, DND_STAT_RUNNING); return 0; } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 425f10392f..b5842acbad 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -217,6 +217,7 @@ int32_t dmInitDndInfo(SDnodeData *pData); // dmEps.c int32_t dmGetDnodeSize(SDnodeData *pData); +int32_t dmGetDnodeId(SDnodeData *pData); int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); diff --git a/source/dnode/mgmt/node_util/src/dmUtil.c b/source/dnode/mgmt/node_util/src/dmUtil.c index f73f041f4a..b50c746c92 100644 --- a/source/dnode/mgmt/node_util/src/dmUtil.c +++ b/source/dnode/mgmt/node_util/src/dmUtil.c @@ -88,3 +88,5 @@ void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) { } return; } + +int32_t dmGetDnodeId(SDnodeData *pData) { return pData->dnodeId; } \ No newline at end of file diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index e5fed2e473..7af48e3d41 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -23,6 +23,7 @@ typedef struct { SAuditCfg cfg; SArray *records; TdThreadMutex lock; + int32_t dnodeId; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index cdc4a964c7..52a4c1c523 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -36,6 +36,8 @@ int32_t auditInit(const SAuditCfg *pCfg) { return taosThreadMutexInit(&tsAudit.lock, NULL); } +void auditSetDnodeId(int32_t dnodeId) { tsAudit.dnodeId = dnodeId; } + static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) { if (record) { taosMemoryFree(record->detail); diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index 7fc718393b..e7d2552e57 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -47,6 +47,7 @@ typedef struct { SMonQmInfo qmInfo; SMonBmInfo bmInfo; SHashObj *metrics; + int32_t dnodeId; } SMonitor; void monGenClusterInfoTable(SMonInfo *pMonitor); diff --git a/source/libs/monitor/src/monFramework.c b/source/libs/monitor/src/monFramework.c index c3b63787a6..76473ccbb1 100644 --- a/source/libs/monitor/src/monFramework.c +++ b/source/libs/monitor/src/monFramework.c @@ -767,9 +767,13 @@ void monSendPromReport() { } if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + char tmp[100] = {0}; + (void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); - }else{ + } else { (void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT); } taosMemoryFreeClear(pCont); diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index 14d62fbf0e..4808ae0fdf 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -131,6 +131,8 @@ int32_t monInit(const SMonCfg *pCfg) { return 0; } +void monSetDnodeId(int32_t dnodeId) { tsMonitor.dnodeId = dnodeId; } + void monInitVnode() { if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; if (tsInsertCounter == NULL) { @@ -599,7 +601,11 @@ void monSendReport(SMonInfo *pMonitor) { } if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + char tmp[100] = {0}; + (void)snprintf(tmp, 100, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); } taosMemoryFree(pCont); @@ -617,8 +623,11 @@ void monSendReportBasic(SMonInfo *pMonitor) { } if (pCont != NULL) { EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != - 0) { + char tmp[100] = {0}; + (void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId)); + uDebug("report cont basic with QID:%s", tmp); + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, tsMonFwBasicUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, + tmp) != 0) { uError("failed to send monitor msg"); } taosMemoryFree(pCont); @@ -669,8 +678,12 @@ void monSendContent(char *pCont, const char *uri) { } } if (pCont != NULL) { + char tmp[100] = {0}; + (void)sprintf(tmp, "0x%" PRIxLEAST64, tGenQid64(tsMonitor.dnodeId)); + uInfoL("report client cont with QID:%s", tmp); EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT; - if (taosSendHttpReport(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) { + if (taosSendHttpReportWithQID(tsMonitor.cfg.server, uri, tsMonitor.cfg.port, pCont, strlen(pCont), flag, tmp) != + 0) { uError("failed to send monitor msg"); } } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index f62d728511..d4c973926e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -53,6 +53,7 @@ typedef struct SHttpMsg { int8_t quit; int64_t chanId; int64_t seq; + char* qid; } SHttpMsg; typedef struct SHttpClient { @@ -81,7 +82,7 @@ static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); + EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg); static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); @@ -91,31 +92,53 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 static void httpModuleDestroy(SHttpModule* http); static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, - int32_t contLen, EHttpCompFlag flag, int64_t chanId); + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid); -static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, +static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, const char* qid, char* pHead, + int32_t headLen, EHttpCompFlag flag) { int32_t code = 0; int32_t len = 0; if (flag == HTTP_FLAT) { - len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - uri, server, contLen); + if (qid == NULL) { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, contLen); + } else { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "X-QID: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, qid, contLen); + } if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } } else if (flag == HTTP_GZIP) { - len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Encoding: gzip\n" - "Content-Length: %d\n\n", - uri, server, contLen); + if (qid == NULL) { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, contLen); + } else { + len = snprintf(pHead, headLen, + "POST %s HTTP/1.1\n" + "Host: %s\n" + "X-QID: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, qid, contLen); + } if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } @@ -218,7 +241,7 @@ static void* httpThread(void* arg) { } static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { + EHttpCompFlag flag, int64_t chanId, const char* qid, SHttpMsg** httpMsg) { int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1); if (server == NULL || uri == NULL) { tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); @@ -243,6 +266,10 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, msg->server = taosStrdup(server); msg->uri = taosStrdup(uri); msg->cont = taosMemoryMalloc(contLen); + if (qid != NULL) + msg->qid = taosStrdup(qid); + else + msg->qid = NULL; if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { httpDestroyMsg(msg); *httpMsg = NULL; @@ -263,6 +290,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->server); taosMemoryFree(msg->uri); taosMemoryFree(msg->cont); + if (msg->qid != NULL) taosMemoryFree(msg->qid); taosMemoryFree(msg); } static void httpDestroyMsgWrapper(void* cont, void* param) { @@ -561,7 +589,7 @@ static void httpHandleReq(SHttpMsg* msg) { goto END; } - int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, msg->qid, header, cap, msg->flag); if (headLen < 0) { code = headLen; goto END; @@ -590,6 +618,7 @@ static void httpHandleReq(SHttpMsg* msg) { cli->chanId = chanId; cli->addr = msg->server; cli->port = msg->port; + if (msg->qid != NULL) taosMemoryFree(msg->qid); taosMemoryFree(msg->uri); taosMemoryFree(msg); @@ -675,10 +704,10 @@ void httpModuleDestroy2(SHttpModule* http) { } static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, - int32_t contLen, EHttpCompFlag flag, int64_t chanId) { + int32_t contLen, EHttpCompFlag flag, int64_t chanId, const char* qid) { SHttpModule* load = NULL; SHttpMsg* msg = NULL; - int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg); + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, qid, &msg); if (code != 0) { goto _ERROR; } @@ -714,14 +743,19 @@ _ERROR: } int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId) { - return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId); + EHttpCompFlag flag, int64_t chanId, const char* qid) { + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId, qid); } int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + return taosSendHttpReportWithQID(server, uri, port, pCont, contLen, flag, NULL); +} + +int32_t taosSendHttpReportWithQID(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, const char* qid) { (void)taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId); + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId, qid); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } diff --git a/source/util/src/tuuid.c b/source/util/src/tuuid.c index 9d749cc002..4472189d37 100644 --- a/source/util/src/tuuid.c +++ b/source/util/src/tuuid.c @@ -65,3 +65,18 @@ int64_t tGenIdPI64(void) { return id; } + +int64_t tGenQid64(int8_t dnodeId) { + int64_t id = dnodeId; + + while (true) { + int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1); + + id = (id << 56) | (val & 0xFFFFF) << 8; + if (id) { + break; + } + } + + return id; +} \ No newline at end of file