dump the number of tasks in the queue every 30sec.
This commit is contained in:
parent
d9a34ffeb9
commit
abfac9ab09
|
@ -32,6 +32,8 @@ typedef struct _sched_msg {
|
||||||
|
|
||||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
|
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
|
||||||
|
|
||||||
|
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
|
||||||
|
|
||||||
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
|
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
|
||||||
|
|
||||||
void taosCleanUpScheduler(void *param);
|
void taosCleanUpScheduler(void *param);
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char label[16];
|
char label[16];
|
||||||
|
@ -28,10 +31,13 @@ typedef struct {
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
pthread_t * qthread;
|
pthread_t * qthread;
|
||||||
SSchedMsg * queue;
|
SSchedMsg * queue;
|
||||||
|
|
||||||
|
void* pTmrCtrl;
|
||||||
|
void* pTimer;
|
||||||
} SSchedQueue;
|
} SSchedQueue;
|
||||||
|
|
||||||
void *taosProcessSchedQueue(void *param);
|
static void *taosProcessSchedQueue(void *param);
|
||||||
void taosCleanUpScheduler(void *param);
|
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||||
|
|
||||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
||||||
pthread_attr_t attr;
|
pthread_attr_t attr;
|
||||||
|
@ -96,6 +102,17 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
|
||||||
|
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
|
||||||
|
|
||||||
|
if (tmrCtrl != NULL && pSched != NULL) {
|
||||||
|
pSched->pTmrCtrl = tmrCtrl;
|
||||||
|
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pSched;
|
||||||
|
}
|
||||||
|
|
||||||
void *taosProcessSchedQueue(void *param) {
|
void *taosProcessSchedQueue(void *param) {
|
||||||
SSchedMsg msg;
|
SSchedMsg msg;
|
||||||
SSchedQueue *pSched = (SSchedQueue *)param;
|
SSchedQueue *pSched = (SSchedQueue *)param;
|
||||||
|
@ -173,8 +190,27 @@ void taosCleanUpScheduler(void *param) {
|
||||||
tsem_destroy(&pSched->emptySem);
|
tsem_destroy(&pSched->emptySem);
|
||||||
tsem_destroy(&pSched->fullSem);
|
tsem_destroy(&pSched->fullSem);
|
||||||
pthread_mutex_destroy(&pSched->queueMutex);
|
pthread_mutex_destroy(&pSched->queueMutex);
|
||||||
|
|
||||||
|
if (pSched->pTimer) {
|
||||||
|
taosTmrStopA(&pSched->pTimer);
|
||||||
|
}
|
||||||
|
|
||||||
free(pSched->queue);
|
free(pSched->queue);
|
||||||
free(pSched->qthread);
|
free(pSched->qthread);
|
||||||
free(pSched); // fix memory leak
|
free(pSched); // fix memory leak
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for debug purpose, dump the scheduler status every 1min.
|
||||||
|
void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
|
||||||
|
SSchedQueue *pSched = (SSchedQueue *)qhandle;
|
||||||
|
if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize;
|
||||||
|
if (size > 0) {
|
||||||
|
pTrace("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue