[TD-10430] move telem from dnode to mnode

This commit is contained in:
Shengliang Guan 2021-10-25 10:47:25 +08:00
parent 3b1add19f1
commit 3f592b7be8
6 changed files with 66 additions and 70 deletions

View File

@ -14,8 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "dnodeCheck.h" #include "dnodeCheck.h"
#define MIN_AVAIL_MEMORY_MB 32 #define MIN_AVAIL_MEMORY_MB 32

View File

@ -28,7 +28,6 @@
#include "dnodeMain.h" #include "dnodeMain.h"
#include "dnodeMnodeEps.h" #include "dnodeMnodeEps.h"
#include "dnodeStatus.h" #include "dnodeStatus.h"
#include "dnodeTelem.h"
#include "dnodeTrans.h" #include "dnodeTrans.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "vnode.h"
@ -73,7 +72,6 @@ int32_t dnodeInit() {
taosStepAdd(tsSteps, "dnode-mnode", dnodeInitMnodeModule, mnodeCleanup); taosStepAdd(tsSteps, "dnode-mnode", dnodeInitMnodeModule, mnodeCleanup);
taosStepAdd(tsSteps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); taosStepAdd(tsSteps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
taosStepAdd(tsSteps, "dnode-status", dnodeInitStatus, dnodeCleanupStatus); taosStepAdd(tsSteps, "dnode-status", dnodeInitStatus, dnodeCleanupStatus);
taosStepAdd(tsSteps, "dnode-telem", dnodeInitTelem, dnodeCleanupTelem);
//taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup); //taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup);
taosStepExec(tsSteps); taosStepExec(tsSteps);

View File

@ -16,6 +16,7 @@
#ifndef _TD_MNODE_DEF_H_ #ifndef _TD_MNODE_DEF_H_
#define _TD_MNODE_DEF_H_ #define _TD_MNODE_DEF_H_
#include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"

View File

@ -13,19 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_TELEMETRY_H_ #ifndef _TD_MNODE_TELEMETRY_H_
#define _TD_DNODE_TELEMETRY_H_ #define _TD_MNODE_TELEMETRY_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h" #include "mnodeInt.h"
int32_t dnodeInitTelem(); int32_t mnodeInitTelem();
void dnodeCleanupTelem(); void mnodeCleanupTelem();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_TELEMETRY_H_*/ #endif /*_TD_MNODE_TELEMETRY_H_*/

View File

@ -14,12 +14,9 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mnodeTelem.h"
#include "tbuffer.h" #include "tbuffer.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeCfg.h"
#include "dnodeTelem.h"
#include "mnode.h"
#define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80 #define TELEMETRY_PORT 80
@ -38,9 +35,9 @@ static struct {
char email[TSDB_FQDN_LEN]; char email[TSDB_FQDN_LEN];
} tsTelem; } tsTelem;
static void dnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } static void mnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
static void dnodeCloseObject(SBufferWriter* bw) { static void mnodeCloseObject(SBufferWriter* bw) {
size_t len = tbufTell(bw); size_t len = tbufTell(bw);
if (tbufGetData(bw, false)[len - 1] == ',') { if (tbufGetData(bw, false)[len - 1] == ',') {
tbufWriteCharAt(bw, len - 1, '}'); tbufWriteCharAt(bw, len - 1, '}');
@ -66,14 +63,14 @@ static void closeArray(SBufferWriter* bw) {
} }
#endif #endif
static void dnodeWriteString(SBufferWriter* bw, const char* str) { static void mnodeWriteString(SBufferWriter* bw, const char* str) {
tbufWriteChar(bw, '"'); tbufWriteChar(bw, '"');
tbufWrite(bw, str, strlen(str)); tbufWrite(bw, str, strlen(str));
tbufWriteChar(bw, '"'); tbufWriteChar(bw, '"');
} }
static void dnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) { static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
dnodeWriteString(bw, k); mnodeWriteString(bw, k);
tbufWriteChar(bw, ':'); tbufWriteChar(bw, ':');
char buf[32]; char buf[32];
sprintf(buf, "%" PRId64, v); sprintf(buf, "%" PRId64, v);
@ -81,14 +78,14 @@ static void dnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
tbufWriteChar(bw, ','); tbufWriteChar(bw, ',');
} }
static void dnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) { static void mnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) {
dnodeWriteString(bw, k); mnodeWriteString(bw, k);
tbufWriteChar(bw, ':'); tbufWriteChar(bw, ':');
dnodeWriteString(bw, v); mnodeWriteString(bw, v);
tbufWriteChar(bw, ','); tbufWriteChar(bw, ',');
} }
static void dnodeAddCpuInfo(SBufferWriter* bw) { static void mnodeAddCpuInfo(SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
int32_t done = 0; int32_t done = 0;
@ -102,11 +99,11 @@ static void dnodeAddCpuInfo(SBufferWriter* bw) {
line[size - 1] = '\0'; line[size - 1] = '\0';
if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) { if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
const char* v = strchr(line, ':') + 2; const char* v = strchr(line, ':') + 2;
dnodeAddStringField(bw, "cpuModel", v); mnodeAddStringField(bw, "cpuModel", v);
done |= 1; done |= 1;
} else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) { } else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) {
const char* v = strchr(line, ':') + 2; const char* v = strchr(line, ':') + 2;
dnodeWriteString(bw, "numOfCpu"); mnodeWriteString(bw, "numOfCpu");
tbufWriteChar(bw, ':'); tbufWriteChar(bw, ':');
tbufWrite(bw, v, strlen(v)); tbufWrite(bw, v, strlen(v));
tbufWriteChar(bw, ','); tbufWriteChar(bw, ',');
@ -118,7 +115,7 @@ static void dnodeAddCpuInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void dnodeAddOsInfo(SBufferWriter* bw) { static void mnodeAddOsInfo(SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
@ -135,7 +132,7 @@ static void dnodeAddOsInfo(SBufferWriter* bw) {
p++; p++;
line[size - 2] = 0; line[size - 2] = 0;
} }
dnodeAddStringField(bw, "os", p); mnodeAddStringField(bw, "os", p);
break; break;
} }
} }
@ -144,7 +141,7 @@ static void dnodeAddOsInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void dnodeAddMemoryInfo(SBufferWriter* bw) { static void mnodeAddMemoryInfo(SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
@ -158,7 +155,7 @@ static void dnodeAddMemoryInfo(SBufferWriter* bw) {
if (strncmp(line, "MemTotal", 8) == 0) { if (strncmp(line, "MemTotal", 8) == 0) {
const char* p = strchr(line, ':') + 1; const char* p = strchr(line, ':') + 1;
while (*p == ' ') p++; while (*p == ' ') p++;
dnodeAddStringField(bw, "memory", p); mnodeAddStringField(bw, "memory", p);
break; break;
} }
} }
@ -167,57 +164,57 @@ static void dnodeAddMemoryInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void dnodeAddVersionInfo(SBufferWriter* bw) { static void mnodeAddVersionInfo(SBufferWriter* bw) {
dnodeAddStringField(bw, "version", version); mnodeAddStringField(bw, "version", version);
dnodeAddStringField(bw, "buildInfo", buildinfo); mnodeAddStringField(bw, "buildInfo", buildinfo);
dnodeAddStringField(bw, "gitInfo", gitinfo); mnodeAddStringField(bw, "gitInfo", gitinfo);
dnodeAddStringField(bw, "email", tsTelem.email); mnodeAddStringField(bw, "email", tsTelem.email);
} }
static void dnodeAddRuntimeInfo(SBufferWriter* bw) { static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
SMnodeStat stat = {0}; SMnodeStat stat = {0};
if (mnodeGetStatistics(&stat) != 0) { if (mnodeGetStatistics(&stat) != 0) {
return; return;
} }
dnodeAddIntField(bw, "numOfDnode", stat.numOfDnode); mnodeAddIntField(bw, "numOfDnode", stat.numOfDnode);
dnodeAddIntField(bw, "numOfMnode", stat.numOfMnode); mnodeAddIntField(bw, "numOfMnode", stat.numOfMnode);
dnodeAddIntField(bw, "numOfVgroup", stat.numOfVgroup); mnodeAddIntField(bw, "numOfVgroup", stat.numOfVgroup);
dnodeAddIntField(bw, "numOfDatabase", stat.numOfDatabase); mnodeAddIntField(bw, "numOfDatabase", stat.numOfDatabase);
dnodeAddIntField(bw, "numOfSuperTable", stat.numOfSuperTable); mnodeAddIntField(bw, "numOfSuperTable", stat.numOfSuperTable);
dnodeAddIntField(bw, "numOfChildTable", stat.numOfChildTable); mnodeAddIntField(bw, "numOfChildTable", stat.numOfChildTable);
dnodeAddIntField(bw, "numOfColumn", stat.numOfColumn); mnodeAddIntField(bw, "numOfColumn", stat.numOfColumn);
dnodeAddIntField(bw, "numOfPoint", stat.totalPoints); mnodeAddIntField(bw, "numOfPoint", stat.totalPoints);
dnodeAddIntField(bw, "totalStorage", stat.totalStorage); mnodeAddIntField(bw, "totalStorage", stat.totalStorage);
dnodeAddIntField(bw, "compStorage", stat.compStorage); mnodeAddIntField(bw, "compStorage", stat.compStorage);
} }
static void dnodeSendTelemetryReport() { static void mnodeSendTelemetryReport() {
char buf[128] = {0}; char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); mTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
return; return;
} }
SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
if (fd < 0) { if (fd < 0) {
dTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); mTrace("failed to create socket for telemetry, reason:%s", strerror(errno));
return; return;
} }
char clusterId[TSDB_CLUSTER_ID_LEN] = {0}; char clusterId[TSDB_CLUSTER_ID_LEN] = {0};
dnodeGetClusterId(clusterId); mnodeGetClusterId(clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
dnodeBeginObject(&bw); mnodeBeginObject(&bw);
dnodeAddStringField(&bw, "instanceId", clusterId); mnodeAddStringField(&bw, "instanceId", clusterId);
dnodeAddIntField(&bw, "reportVersion", 1); mnodeAddIntField(&bw, "reportVersion", 1);
dnodeAddOsInfo(&bw); mnodeAddOsInfo(&bw);
dnodeAddCpuInfo(&bw); mnodeAddCpuInfo(&bw);
dnodeAddMemoryInfo(&bw); mnodeAddMemoryInfo(&bw);
dnodeAddVersionInfo(&bw); mnodeAddVersionInfo(&bw);
dnodeAddRuntimeInfo(&bw); mnodeAddRuntimeInfo(&bw);
dnodeCloseObject(&bw); mnodeCloseObject(&bw);
const char* header = const char* header =
"POST /report HTTP/1.1\n" "POST /report HTTP/1.1\n"
@ -235,18 +232,18 @@ static void dnodeSendTelemetryReport() {
// read something to avoid nginx error 499 // read something to avoid nginx error 499
if (taosReadSocket(fd, buf, 10) < 0) { if (taosReadSocket(fd, buf, 10) < 0) {
dTrace("failed to receive response since %s", strerror(errno)); mTrace("failed to receive response since %s", strerror(errno));
} }
taosCloseSocket(fd); taosCloseSocket(fd);
} }
static void* dnodeTelemThreadFp(void* param) { static void* mnodeTelemThreadFp(void* param) {
struct timespec end = {0}; struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("dnode-telem"); setThreadName("mnode-telem");
while (!tsTelem.exit) { while (!tsTelem.exit) {
int32_t r = 0; int32_t r = 0;
@ -258,7 +255,7 @@ static void* dnodeTelemThreadFp(void* param) {
if (r != ETIMEDOUT) continue; if (r != ETIMEDOUT) continue;
if (mnodeGetStatus() == MN_STATUS_READY) { if (mnodeGetStatus() == MN_STATUS_READY) {
dnodeSendTelemetryReport(); mnodeSendTelemetryReport();
} }
end.tv_sec += REPORT_INTERVAL; end.tv_sec += REPORT_INTERVAL;
} }
@ -266,20 +263,20 @@ static void* dnodeTelemThreadFp(void* param) {
return NULL; return NULL;
} }
static void dnodeGetEmail(char* filepath) { static void mnodeGetEmail(char* filepath) {
int32_t fd = taosOpenFileRead(filepath); int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) { if (fd < 0) {
return; return;
} }
if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) { if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
} }
taosCloseFile(fd); taosCloseFile(fd);
} }
int32_t dnodeInitTelem() { int32_t mnodeInitTelem() {
tsTelem.enable = tsEnableTelemetryReporting; tsTelem.enable = tsEnableTelemetryReporting;
if (!tsTelem.enable) return 0; if (!tsTelem.enable) return 0;
@ -288,23 +285,23 @@ int32_t dnodeInitTelem() {
pthread_cond_init(&tsTelem.cond, NULL); pthread_cond_init(&tsTelem.cond, NULL);
tsTelem.email[0] = 0; tsTelem.email[0] = 0;
dnodeGetEmail("/usr/local/taos/email"); mnodeGetEmail("/usr/local/taos/email");
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsTelem.thread, &attr, dnodeTelemThreadFp, NULL); int32_t code = pthread_create(&tsTelem.thread, &attr, mnodeTelemThreadFp, NULL);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
if (code != 0) { if (code != 0) {
dTrace("failed to create telemetry thread since :%s", strerror(code)); mTrace("failed to create telemetry thread since :%s", strerror(code));
} }
dInfo("dnode telemetry is initialized"); mInfo("mnode telemetry is initialized");
return 0; return 0;
} }
void dnodeCleanupTelem() { void mnodeCleanupTelem() {
if (!tsTelem.enable) return; if (!tsTelem.enable) return;
if (taosCheckPthreadValid(tsTelem.thread)) { if (taosCheckPthreadValid(tsTelem.thread)) {

View File

@ -34,6 +34,7 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#include "mnodeWorker.h" #include "mnodeWorker.h"
#include "mnodeTelem.h"
static struct { static struct {
int32_t state; int32_t state;
@ -127,6 +128,7 @@ static int32_t mnodeInitStep2() {
taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow); taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync); taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync);
taosStepAdd(steps, "mnode-worker", NULL, mnodeCleanupWorker); taosStepAdd(steps, "mnode-worker", NULL, mnodeCleanupWorker);
taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem);
taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer); taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer);
tsMint.steps2 = steps; tsMint.steps2 = steps;