Merge pull request #16031 from taosdata/fix/TD-18351
fix: fix task sched crash issue
This commit is contained in:
commit
6a85467d49
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_UTIL_SCHED_H_
|
#define _TD_UTIL_SCHED_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -30,6 +31,24 @@ typedef struct SSchedMsg {
|
||||||
void *thandle;
|
void *thandle;
|
||||||
} SSchedMsg;
|
} SSchedMsg;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char label[TSDB_LABEL_LEN];
|
||||||
|
tsem_t emptySem;
|
||||||
|
tsem_t fullSem;
|
||||||
|
TdThreadMutex queueMutex;
|
||||||
|
int32_t fullSlot;
|
||||||
|
int32_t emptySlot;
|
||||||
|
int32_t queueSize;
|
||||||
|
int32_t numOfThreads;
|
||||||
|
TdThread *qthread;
|
||||||
|
SSchedMsg *queue;
|
||||||
|
int8_t stop;
|
||||||
|
void *pTmrCtrl;
|
||||||
|
void *pTimer;
|
||||||
|
} SSchedQueue;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
|
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
|
||||||
* pool will be created to consume the messages in the queue.
|
* pool will be created to consume the messages in the queue.
|
||||||
|
@ -38,7 +57,7 @@ typedef struct SSchedMsg {
|
||||||
* @param label the label of the queue
|
* @param label the label of the queue
|
||||||
* @return the created queue scheduler
|
* @return the created queue scheduler
|
||||||
*/
|
*/
|
||||||
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label);
|
void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a thread-safe ring-buffer based task queue and return the instance.
|
* Create a thread-safe ring-buffer based task queue and return the instance.
|
||||||
|
|
|
@ -62,12 +62,13 @@ static void indexDestroy(void* sIdx);
|
||||||
|
|
||||||
void indexInit() {
|
void indexInit() {
|
||||||
// refactor later
|
// refactor later
|
||||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL);
|
||||||
indexRefMgt = taosOpenRef(1000, indexDestroy);
|
indexRefMgt = taosOpenRef(1000, indexDestroy);
|
||||||
}
|
}
|
||||||
void indexCleanup() {
|
void indexCleanup() {
|
||||||
// refacto later
|
// refacto later
|
||||||
taosCleanUpScheduler(indexQhandle);
|
taosCleanUpScheduler(indexQhandle);
|
||||||
|
taosMemoryFreeClear(indexQhandle);
|
||||||
taosCloseRef(indexRefMgt);
|
taosCloseRef(indexRefMgt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,12 +96,12 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* pTaskQueue = NULL;
|
static SSchedQueue pTaskQueue = {0};
|
||||||
|
|
||||||
int32_t initTaskQueue() {
|
int32_t initTaskQueue() {
|
||||||
int32_t queueSize = tsMaxShellConns * 2;
|
int32_t queueSize = tsMaxShellConns * 2;
|
||||||
pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc");
|
void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue);
|
||||||
if (NULL == pTaskQueue) {
|
if (NULL == p) {
|
||||||
qError("failed to init task queue");
|
qError("failed to init task queue");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ int32_t initTaskQueue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cleanupTaskQueue() {
|
int32_t cleanupTaskQueue() {
|
||||||
taosCleanUpScheduler(pTaskQueue);
|
taosCleanUpScheduler(&pTaskQueue);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
|
||||||
schedMsg.thandle = execParam;
|
schedMsg.thandle = execParam;
|
||||||
schedMsg.msg = code;
|
schedMsg.msg = code;
|
||||||
|
|
||||||
taosScheduleTask(pTaskQueue, &schedMsg);
|
taosScheduleTask(&pTaskQueue, &schedMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,31 +22,17 @@
|
||||||
|
|
||||||
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
|
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char label[TSDB_LABEL_LEN];
|
|
||||||
tsem_t emptySem;
|
|
||||||
tsem_t fullSem;
|
|
||||||
TdThreadMutex queueMutex;
|
|
||||||
int32_t fullSlot;
|
|
||||||
int32_t emptySlot;
|
|
||||||
int32_t queueSize;
|
|
||||||
int32_t numOfThreads;
|
|
||||||
TdThread *qthread;
|
|
||||||
SSchedMsg *queue;
|
|
||||||
bool stop;
|
|
||||||
void *pTmrCtrl;
|
|
||||||
void *pTimer;
|
|
||||||
} SSchedQueue;
|
|
||||||
|
|
||||||
static void *taosProcessSchedQueue(void *param);
|
static void *taosProcessSchedQueue(void *param);
|
||||||
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||||
|
|
||||||
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label) {
|
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
|
||||||
SSchedQueue *pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
|
if (NULL == pSched) {
|
||||||
|
pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
|
||||||
if (pSched == NULL) {
|
if (pSched == NULL) {
|
||||||
uError("%s: no enough memory for pSched", label);
|
uError("%s: no enough memory for pSched", label);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
|
pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
|
||||||
if (pSched->queue == NULL) {
|
if (pSched->queue == NULL) {
|
||||||
|
@ -86,7 +72,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSched->stop = false;
|
atomic_store_8(&pSched->stop, 0);
|
||||||
for (int32_t i = 0; i < numOfThreads; ++i) {
|
for (int32_t i = 0; i < numOfThreads; ++i) {
|
||||||
TdThreadAttr attr;
|
TdThreadAttr attr;
|
||||||
taosThreadAttrInit(&attr);
|
taosThreadAttrInit(&attr);
|
||||||
|
@ -107,7 +93,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
|
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
|
||||||
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label);
|
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL);
|
||||||
|
|
||||||
if (tmrCtrl != NULL && pSched != NULL) {
|
if (tmrCtrl != NULL && pSched != NULL) {
|
||||||
pSched->pTmrCtrl = tmrCtrl;
|
pSched->pTmrCtrl = tmrCtrl;
|
||||||
|
@ -131,7 +117,7 @@ void *taosProcessSchedQueue(void *scheduler) {
|
||||||
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
if (pSched->stop) {
|
if (atomic_load_8(&pSched->stop)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +158,11 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (atomic_load_8(&pSched->stop)) {
|
||||||
|
uError("sched is already stopped, msg:%p is dropped", pMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
|
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
|
||||||
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
|
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -202,7 +193,10 @@ void taosCleanUpScheduler(void *param) {
|
||||||
|
|
||||||
uDebug("start to cleanup %s schedQsueue", pSched->label);
|
uDebug("start to cleanup %s schedQsueue", pSched->label);
|
||||||
|
|
||||||
pSched->stop = true;
|
atomic_store_8(&pSched->stop, 1);
|
||||||
|
|
||||||
|
taosMsleep(200);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
||||||
if (taosCheckPthreadValid(pSched->qthread[i])) {
|
if (taosCheckPthreadValid(pSched->qthread[i])) {
|
||||||
tsem_post(&pSched->fullSem);
|
tsem_post(&pSched->fullSem);
|
||||||
|
@ -226,7 +220,7 @@ void taosCleanUpScheduler(void *param) {
|
||||||
|
|
||||||
if (pSched->queue) taosMemoryFree(pSched->queue);
|
if (pSched->queue) taosMemoryFree(pSched->queue);
|
||||||
if (pSched->qthread) taosMemoryFree(pSched->qthread);
|
if (pSched->qthread) taosMemoryFree(pSched->qthread);
|
||||||
taosMemoryFree(pSched); // fix memory leak
|
//taosMemoryFree(pSched);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for debug purpose, dump the scheduler status every 1min.
|
// for debug purpose, dump the scheduler status every 1min.
|
||||||
|
|
|
@ -555,7 +555,7 @@ static void taosTmrModuleInit(void) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
|
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
|
||||||
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
|
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
|
||||||
|
|
||||||
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
|
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
|
||||||
|
@ -606,6 +606,7 @@ void taosTmrCleanUp(void* handle) {
|
||||||
taosUninitTimer();
|
taosUninitTimer();
|
||||||
|
|
||||||
taosCleanUpScheduler(tmrQhandle);
|
taosCleanUpScheduler(tmrQhandle);
|
||||||
|
taosMemoryFreeClear(tmrQhandle);
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(wheels); i++) {
|
for (int32_t i = 0; i < tListLen(wheels); i++) {
|
||||||
time_wheel_t* wheel = wheels + i;
|
time_wheel_t* wheel = wheels + i;
|
||||||
|
|
Loading…
Reference in New Issue