pull telemetry by timer

This commit is contained in:
Shengliang Guan 2022-02-18 17:29:38 +08:00
parent 8a09500429
commit 12d1bf03ac
3 changed files with 20 additions and 71 deletions

View File

@ -56,12 +56,9 @@ typedef struct {
} SProfileMgmt;
typedef struct {
int8_t enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
int8_t enable;
SRWLatch lock;
char email[TSDB_FQDN_LEN];
} STelemMgmt;
typedef struct {

View File

@ -234,32 +234,15 @@ static void mndSendTelemetryReport(SMnode* pMnode) {
taosCloseSocket(fd);
}
static void* mndTelemThreadFp(void* param) {
SMnode* pMnode = param;
static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) {
SMnode* pMnode = pReq->pMnode;
STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!pMgmt->enable) return 0;
struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("mnd-telem");
while (!pMgmt->exit) {
int32_t r = 0;
struct timespec ts = end;
pthread_mutex_lock(&pMgmt->lock);
r = pthread_cond_timedwait(&pMgmt->cond, &pMgmt->lock, &ts);
pthread_mutex_unlock(&pMgmt->lock);
if (r == 0) break;
if (r != ETIMEDOUT) continue;
if (mndIsMaster(pMnode)) {
mndSendTelemetryReport(pMnode);
}
end.tv_sec += REPORT_INTERVAL;
}
return NULL;
taosWLockLatch(&pMgmt->lock);
mndSendTelemetryReport(pMnode);
taosWUnLockLatch(&pMgmt->lock);
return 0;
}
static void mndGetEmail(SMnode* pMnode, char* filepath) {
@ -280,43 +263,12 @@ static void mndGetEmail(SMnode* pMnode, char* filepath) {
int32_t mndInitTelem(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
pMgmt->enable = pMnode->cfg.enableTelem;
if (!pMgmt->enable) return 0;
pMgmt->exit = 0;
pthread_mutex_init(&pMgmt->lock, NULL);
pthread_cond_init(&pMgmt->cond, NULL);
pMgmt->email[0] = 0;
taosInitRWLatch(&pMgmt->lock);
mndGetEmail(pMnode, "/usr/local/taos/email");
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode);
pthread_attr_destroy(&attr);
if (code != 0) {
mDebug("failed to create telemetry thread since :%s", strerror(code));
}
mInfo("mnd telemetry is initialized");
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
mDebug("mnode telemetry is initialized");
return 0;
}
void mndCleanupTelem(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!pMgmt->enable) return;
if (taosCheckPthreadValid(pMgmt->thread)) {
pthread_mutex_lock(&pMgmt->lock);
pMgmt->exit = 1;
pthread_cond_signal(&pMgmt->cond);
pthread_mutex_unlock(&pMgmt->lock);
pthread_join(pMgmt->thread, NULL);
}
pthread_mutex_destroy(&pMgmt->lock);
pthread_cond_destroy(&pMgmt->cond);
}
void mndCleanupTelem(SMnode* pMnode) {}

View File

@ -77,7 +77,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
return pReq;
}
static void mndExecuteTransaction(void *param, void *tmrId) {
static void mndPullupTrans(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
int32_t contLen = 0;
@ -86,7 +86,7 @@ static void mndExecuteTransaction(void *param, void *tmrId) {
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}
taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
}
static void mndCalMqRebalance(void *param, void *tmrId) {
@ -101,7 +101,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}
static void mndExecuteTelemetry(void *param, void *tmrId) {
static void mndPullupTelem(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
int32_t contLen = 0;
@ -110,7 +110,7 @@ static void mndExecuteTelemetry(void *param, void *tmrId) {
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
}
taosTmrReset(mndExecuteTelemetry, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
}
static int32_t mndInitTimer(SMnode *pMnode) {
@ -120,7 +120,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1;
}
if (taosTmrReset(mndExecuteTransaction, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -130,7 +130,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1;
}
if (taosTmrReset(mndCalMqRebalance, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer)) {
if (taosTmrReset(mndPullupTelem, 1000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}