From 9cd7c54e252606290bdb1b1ee77003c162ffbd9b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 12 Aug 2022 10:41:02 +0800 Subject: [PATCH 1/2] fix: fix task sched crash issue --- include/util/tsched.h | 21 ++++++++++++++- source/libs/index/src/index.c | 2 +- source/libs/qcom/src/queryUtil.c | 10 +++---- source/util/src/tsched.c | 46 ++++++++++++++------------------ source/util/src/ttimer.c | 2 +- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/include/util/tsched.h b/include/util/tsched.h index 3bf740f528..347cacd191 100644 --- a/include/util/tsched.h +++ b/include/util/tsched.h @@ -17,6 +17,7 @@ #define _TD_UTIL_SCHED_H_ #include "os.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -30,6 +31,24 @@ typedef struct SSchedMsg { void *thandle; } SSchedMsg; + +typedef struct { + char label[TSDB_LABEL_LEN]; + tsem_t emptySem; + tsem_t fullSem; + TdThreadMutex queueMutex; + int32_t fullSlot; + int32_t emptySlot; + int32_t queueSize; + int32_t numOfThreads; + TdThread *qthread; + SSchedMsg *queue; + int8_t stop; + void *pTmrCtrl; + void *pTimer; +} SSchedQueue; + + /** * Create a thread-safe ring-buffer based task queue and return the instance. A thread * pool will be created to consume the messages in the queue. @@ -38,7 +57,7 @@ typedef struct SSchedMsg { * @param label the label of the queue * @return the created queue scheduler */ -void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label); +void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched); /** * Create a thread-safe ring-buffer based task queue and return the instance. diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 850ddd4970..eac585c2e6 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -62,7 +62,7 @@ static void indexDestroy(void* sIdx); void indexInit() { // refactor later - indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL); indexRefMgt = taosOpenRef(1000, indexDestroy); } void indexCleanup() { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 41333e7756..5143aa4af1 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static void* pTaskQueue = NULL; +static SSchedQueue pTaskQueue = {0}; int32_t initTaskQueue() { int32_t queueSize = tsMaxShellConns * 2; - pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc"); - if (NULL == pTaskQueue) { + void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); + if (NULL == p) { qError("failed to init task queue"); return -1; } @@ -111,7 +111,7 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - taosCleanUpScheduler(pTaskQueue); + taosCleanUpScheduler(&pTaskQueue); return 0; } @@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) schedMsg.thandle = execParam; schedMsg.msg = code; - taosScheduleTask(pTaskQueue, &schedMsg); + taosScheduleTask(&pTaskQueue, &schedMsg); return 0; } diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 9abce966f5..89471c4347 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -22,30 +22,16 @@ #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. -typedef struct { - char label[TSDB_LABEL_LEN]; - tsem_t emptySem; - tsem_t fullSem; - TdThreadMutex queueMutex; - int32_t fullSlot; - int32_t emptySlot; - int32_t queueSize; - int32_t numOfThreads; - TdThread *qthread; - SSchedMsg *queue; - bool stop; - void *pTmrCtrl; - void *pTimer; -} SSchedQueue; - static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); -void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) { - SSchedQueue *pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); - if (pSched == NULL) { - uError("%s: no enough memory for pSched", label); - return NULL; +void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) { + if (NULL == pSched) { + pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1); + if (pSched == NULL) { + uError("%s: no enough memory for pSched", label); + return NULL; + } } pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize); @@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab return NULL; } - pSched->stop = false; + atomic_store_8(&pSched->stop, 0); for (int32_t i = 0; i < numOfThreads; ++i) { TdThreadAttr attr; taosThreadAttrInit(&attr); @@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab } void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) { - SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label); + SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL); if (tmrCtrl != NULL && pSched != NULL) { pSched->pTmrCtrl = tmrCtrl; @@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) { uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); ASSERT(0); } - if (pSched->stop) { + if (atomic_load_8(&pSched->stop)) { break; } @@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { return; } + if (atomic_load_8(&pSched->stop)) { + uError("sched is already stopped, msg:%p is dropped", pMsg); + return; + } + if ((ret = tsem_wait(&pSched->emptySem)) != 0) { uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); ASSERT(0); @@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) { uDebug("start to cleanup %s schedQsueue", pSched->label); - pSched->stop = true; + atomic_store_8(&pSched->stop, 1); + + taosMsleep(200); + for (int32_t i = 0; i < pSched->numOfThreads; ++i) { if (taosCheckPthreadValid(pSched->qthread[i])) { tsem_post(&pSched->fullSem); @@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) { if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->qthread) taosMemoryFree(pSched->qthread); - taosMemoryFree(pSched); // fix memory leak + //taosMemoryFree(pSched); } // for debug purpose, dump the scheduler status every 1min. diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 7256331c85..ff5ac1b217 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) { return; } - tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); + tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL); taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads); From d5072a8f323899edfb763ab397bfe6f59a67cfbc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 12 Aug 2022 11:36:49 +0800 Subject: [PATCH 2/2] fix: fix sched handler memory leak issue --- source/libs/index/src/index.c | 1 + source/util/src/ttimer.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index eac585c2e6..be64a8b44d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -68,6 +68,7 @@ void indexInit() { void indexCleanup() { // refacto later taosCleanUpScheduler(indexQhandle); + taosMemoryFreeClear(indexQhandle); taosCloseRef(indexRefMgt); } diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index ff5ac1b217..3a868c7f97 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -606,6 +606,7 @@ void taosTmrCleanUp(void* handle) { taosUninitTimer(); taosCleanUpScheduler(tmrQhandle); + taosMemoryFreeClear(tmrQhandle); for (int32_t i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i;