From ca43acde1e7ee1fce13657ddd1525e89f83a2588 Mon Sep 17 00:00:00 2001 From: AlexDuan <417921451@qq.com> Date: Fri, 27 Aug 2021 18:08:16 +0800 Subject: [PATCH] use new thread replace timer --- src/tsdb/inc/tsdbint.h | 2 +- src/tsdb/src/tsdbHealth.c | 31 +++++++++++++++----- src/tsdb/src/tsdbMain.c | 9 +++--- src/util/inc/tthread.h | 37 +++++++++++++++++++++++ src/util/src/tthread.c | 62 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 src/util/inc/tthread.h create mode 100644 src/util/src/tthread.c diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 3bbc4bd111..80e9297579 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -97,7 +97,7 @@ struct STsdbRepo { SMergeBuf mergeBuf; //used when update=2 int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? - void* tmrCtrl; + pthread_t* pthread; }; #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index 63dffff3dd..44cc3ff423 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -24,6 +24,7 @@ #include "tsdbLog.h" #include "tsdbHealth.h" #include "ttimer.h" +#include "tthread.h" // return malloc new block count @@ -47,30 +48,44 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { } // switch anther thread to run -void cbKillQueryFree(void* param1, void* param2) { +void* cbKillQueryFree(void* param1) { STsdbRepo* pRepo = (STsdbRepo*)param1; // vnode if(pRepo->appH.notifyStatus) { pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS); } + + // free + if(pRepo->pthread){ + void* p = pRepo->pthread; + pRepo->pthread = NULL; + free(p); + } + + return NULL; } // return true do free , false do nothing bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { - // 1 start timer - if(pRepo->tmrCtrl == NULL){ - pRepo->tmrCtrl = taosTmrInit(0, 0, 0, "REPO"); + // check previous running + if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) { + tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks); + return false; } - - tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, pRepo->tmrCtrl); - return hTimer != NULL; + // create new + pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo); + if(pRepo->pthread == NULL) { + tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo)); + return false; + } + return true; } bool tsdbAllowNewBlock(STsdbRepo* pRepo) { int32_t nMaxElastic = pRepo->config.totalBlocks/3; STsdbBufPool* pPool = pRepo->pPool; if(pPool->nElasticBlocks >= nMaxElastic) { - tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); + tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic); return false; } return true; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 099e369de9..c2021963e0 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -17,6 +17,7 @@ #include "taosdef.h" #include "tsdbint.h" #include "ttimer.h" +#include "tthread.h" #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) @@ -127,9 +128,9 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { terrno = TSDB_CODE_SUCCESS; tsdbStopStream(pRepo); - if(pRepo->tmrCtrl){ - taosTmrCleanUp(pRepo->tmrCtrl); - pRepo->tmrCtrl = NULL; + if(pRepo->pthread){ + taosDestoryThread(pRepo->pthread); + pRepo->pthread = NULL; } if (toCommit) { @@ -552,7 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { pRepo->appH = *pAppH; } pRepo->repoLocked = false; - pRepo->tmrCtrl = NULL; + pRepo->pthread = NULL; int code = pthread_mutex_init(&(pRepo->mutex), NULL); if (code != 0) { diff --git a/src/util/inc/tthread.h b/src/util/inc/tthread.h new file mode 100644 index 0000000000..7443ad706d --- /dev/null +++ b/src/util/inc/tthread.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TTHREAD_H +#define TDENGINE_TTHREAD_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "taosdef.h" + +// create new thread +pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); +// destory thread +bool taosDestoryThread(pthread_t* pthread); +// thread running return true +bool taosThreadRunning(pthread_t* pthread); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TTHREAD_H diff --git a/src/util/src/tthread.c b/src/util/src/tthread.c new file mode 100644 index 0000000000..043b2de2f2 --- /dev/null +++ b/src/util/src/tthread.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tthread.h" +#include "tglobal.h" +#include "taosdef.h" +#include "tutil.h" +#include "tulog.h" +#include "taoserror.h" + +// create new thread +pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { + pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int32_t ret = pthread_create(pthread, &thattr, __start_routine, param); + pthread_attr_destroy(&thattr); + + if (ret != 0) { + free(pthread); + return NULL; + } + return pthread; +} + +// destory thread +bool taosDestoryThread(pthread_t* pthread) { + if(pthread == NULL) return false; + if(taosThreadRunning(pthread)) { + pthread_cancel(*pthread); + pthread_join(*pthread, NULL); + } + + free(pthread); + return true; +} + +// thread running return true +bool taosThreadRunning(pthread_t* pthread) { + if(pthread == NULL) return false; + int ret = pthread_kill(*pthread, 0); + if(ret == ESRCH) + return false; + if(ret == EINVAL) + return false; + // alive + return true; +}