From 12d1bf03ac133088b1cf32f7004c0d4d5ff9ce2d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Feb 2022 17:29:38 +0800 Subject: [PATCH] pull telemetry by timer --- source/dnode/mnode/impl/inc/mndInt.h | 9 ++-- source/dnode/mnode/impl/src/mndTelem.c | 70 ++++---------------------- source/dnode/mnode/impl/src/mnode.c | 12 ++--- 3 files changed, 20 insertions(+), 71 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ccb523d11e..6c7d4be581 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -56,12 +56,9 @@ typedef struct { } SProfileMgmt; typedef struct { - int8_t enable; - pthread_mutex_t lock; - pthread_cond_t cond; - volatile int32_t exit; - pthread_t thread; - char email[TSDB_FQDN_LEN]; + int8_t enable; + SRWLatch lock; + char email[TSDB_FQDN_LEN]; } STelemMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 5beb1b10e3..a9cf8f9fab 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -234,32 +234,15 @@ static void mndSendTelemetryReport(SMnode* pMnode) { taosCloseSocket(fd); } -static void* mndTelemThreadFp(void* param) { - SMnode* pMnode = param; +static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) { + SMnode* pMnode = pReq->pMnode; STelemMgmt* pMgmt = &pMnode->telemMgmt; + if (!pMgmt->enable) return 0; - struct timespec end = {0}; - clock_gettime(CLOCK_REALTIME, &end); - end.tv_sec += 300; // wait 5 minutes before send first report - - setThreadName("mnd-telem"); - - while (!pMgmt->exit) { - int32_t r = 0; - struct timespec ts = end; - pthread_mutex_lock(&pMgmt->lock); - r = pthread_cond_timedwait(&pMgmt->cond, &pMgmt->lock, &ts); - pthread_mutex_unlock(&pMgmt->lock); - if (r == 0) break; - if (r != ETIMEDOUT) continue; - - if (mndIsMaster(pMnode)) { - mndSendTelemetryReport(pMnode); - } - end.tv_sec += REPORT_INTERVAL; - } - - return NULL; + taosWLockLatch(&pMgmt->lock); + mndSendTelemetryReport(pMnode); + taosWUnLockLatch(&pMgmt->lock); + return 0; } static void mndGetEmail(SMnode* pMnode, char* filepath) { @@ -280,43 +263,12 @@ static void mndGetEmail(SMnode* pMnode, char* filepath) { int32_t mndInitTelem(SMnode* pMnode) { STelemMgmt* pMgmt = &pMnode->telemMgmt; pMgmt->enable = pMnode->cfg.enableTelem; - - if (!pMgmt->enable) return 0; - - pMgmt->exit = 0; - pthread_mutex_init(&pMgmt->lock, NULL); - pthread_cond_init(&pMgmt->cond, NULL); - pMgmt->email[0] = 0; - + taosInitRWLatch(&pMgmt->lock); mndGetEmail(pMnode, "/usr/local/taos/email"); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - - int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode); - pthread_attr_destroy(&attr); - if (code != 0) { - mDebug("failed to create telemetry thread since :%s", strerror(code)); - } - - mInfo("mnd telemetry is initialized"); + mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer); + mDebug("mnode telemetry is initialized"); return 0; } -void mndCleanupTelem(SMnode* pMnode) { - STelemMgmt* pMgmt = &pMnode->telemMgmt; - if (!pMgmt->enable) return; - - if (taosCheckPthreadValid(pMgmt->thread)) { - pthread_mutex_lock(&pMgmt->lock); - pMgmt->exit = 1; - pthread_cond_signal(&pMgmt->cond); - pthread_mutex_unlock(&pMgmt->lock); - - pthread_join(pMgmt->thread, NULL); - } - - pthread_mutex_destroy(&pMgmt->lock); - pthread_cond_destroy(&pMgmt->cond); -} +void mndCleanupTelem(SMnode* pMnode) {} diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d771dd6e17..b9085c0600 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -77,7 +77,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { return pReq; } -static void mndExecuteTransaction(void *param, void *tmrId) { +static void mndPullupTrans(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; @@ -86,7 +86,7 @@ static void mndExecuteTransaction(void *param, void *tmrId) { pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } - taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); } static void mndCalMqRebalance(void *param, void *tmrId) { @@ -101,7 +101,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); } -static void mndExecuteTelemetry(void *param, void *tmrId) { +static void mndPullupTelem(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; @@ -110,7 +110,7 @@ static void mndExecuteTelemetry(void *param, void *tmrId) { pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } - taosTmrReset(mndExecuteTelemetry, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); + taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); } static int32_t mndInitTimer(SMnode *pMnode) { @@ -120,7 +120,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) { + if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -130,7 +130,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndCalMqRebalance, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer)) { + if (taosTmrReset(mndPullupTelem, 1000, pMnode, pMnode->timer, &pMnode->telemTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; }