Merge pull request #695 from taosdata/feature/newtimer
Feature/newtimer
This commit is contained in:
commit
9eabf5fd0a
|
@ -169,6 +169,8 @@ extern uint32_t debugFlag;
|
||||||
extern uint32_t odbcdebugFlag;
|
extern uint32_t odbcdebugFlag;
|
||||||
extern uint32_t qdebugFlag;
|
extern uint32_t qdebugFlag;
|
||||||
|
|
||||||
|
extern uint32_t taosMaxTmrCtrl;
|
||||||
|
|
||||||
extern int tsRpcTimer;
|
extern int tsRpcTimer;
|
||||||
extern int tsRpcMaxTime;
|
extern int tsRpcMaxTime;
|
||||||
extern int tsUdpDelay;
|
extern int tsUdpDelay;
|
||||||
|
|
|
@ -25,6 +25,7 @@ typedef void (*TAOS_TMR_CALLBACK)(void *, void *);
|
||||||
|
|
||||||
extern uint32_t tmrDebugFlag;
|
extern uint32_t tmrDebugFlag;
|
||||||
extern int taosTmrThreads;
|
extern int taosTmrThreads;
|
||||||
|
extern uint32_t taosMaxTmrCtrl;
|
||||||
|
|
||||||
#define tmrError(...) \
|
#define tmrError(...) \
|
||||||
do { if (tmrDebugFlag & DEBUG_ERROR) { \
|
do { if (tmrDebugFlag & DEBUG_ERROR) { \
|
||||||
|
@ -41,7 +42,6 @@ extern int taosTmrThreads;
|
||||||
tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \
|
tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \
|
||||||
} } while(0)
|
} } while(0)
|
||||||
|
|
||||||
#define MAX_NUM_OF_TMRCTL 512
|
|
||||||
#define MSECONDS_PER_TICK 5
|
#define MSECONDS_PER_TICK 5
|
||||||
|
|
||||||
void *taosTmrInit(int maxTmr, int resoultion, int longest, const char *label);
|
void *taosTmrInit(int maxTmr, int resoultion, int longest, const char *label);
|
||||||
|
|
|
@ -536,6 +536,11 @@ void tsInitGlobalConfig() {
|
||||||
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
// 0-any, 1-mgmt, 2-dnode
|
// 0-any, 1-mgmt, 2-dnode
|
||||||
|
|
||||||
|
// timer
|
||||||
|
tsInitConfigOption(cfg++, "maxTmrCtrl", &taosMaxTmrCtrl, TSDB_CFG_VTYPE_INT,
|
||||||
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
|
||||||
|
8, 2048, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
|
|
||||||
// time
|
// time
|
||||||
tsInitConfigOption(cfg++, "monitorInterval", &tsMonitorInterval, TSDB_CFG_VTYPE_INT,
|
tsInitConfigOption(cfg++, "monitorInterval", &tsMonitorInterval, TSDB_CFG_VTYPE_INT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG,
|
TSDB_CFG_CTYPE_B_CONFIG,
|
||||||
|
|
|
@ -82,12 +82,15 @@ typedef struct time_wheel_t {
|
||||||
} time_wheel_t;
|
} time_wheel_t;
|
||||||
|
|
||||||
uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE;
|
uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE;
|
||||||
|
uint32_t taosMaxTmrCtrl = 512;
|
||||||
|
|
||||||
static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT;
|
static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT;
|
||||||
static pthread_mutex_t tmrCtrlMutex;
|
static pthread_mutex_t tmrCtrlMutex;
|
||||||
static tmr_ctrl_t tmrCtrls[MAX_NUM_OF_TMRCTL];
|
static tmr_ctrl_t* tmrCtrls;
|
||||||
static tmr_ctrl_t* unusedTmrCtrl = NULL;
|
static tmr_ctrl_t* unusedTmrCtrl = NULL;
|
||||||
void* tmrQhandle;
|
static void* tmrQhandle;
|
||||||
|
static int numOfTmrCtrl = 0;
|
||||||
|
|
||||||
int taosTmrThreads = 1;
|
int taosTmrThreads = 1;
|
||||||
|
|
||||||
static uintptr_t nextTimerId = 0;
|
static uintptr_t nextTimerId = 0;
|
||||||
|
@ -129,7 +132,7 @@ static void unlockTimerList(timer_list_t* list) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetPthreadId();
|
||||||
if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) {
|
if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) {
|
||||||
assert(false);
|
assert(false);
|
||||||
tmrError("trying to unlock a timer list not locked by current thread.");
|
tmrError("%d trying to unlock a timer list not locked by current thread.", tid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,13 +259,13 @@ static void processExpiredTimer(void* handle, void* arg) {
|
||||||
timer->executedBy = taosGetPthreadId();
|
timer->executedBy = taosGetPthreadId();
|
||||||
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
|
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
|
||||||
if (state == TIMER_STATE_WAITING) {
|
if (state == TIMER_STATE_WAITING) {
|
||||||
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution start.";
|
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start.";
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
|
|
||||||
(*timer->fp)(timer->param, (tmr_h)timer->id);
|
(*timer->fp)(timer->param, (tmr_h)timer->id);
|
||||||
atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
|
atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
|
||||||
|
|
||||||
fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution end.";
|
fmt = "%s timer[id=%lld, fp=%p, param=%p] execution end.";
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
}
|
}
|
||||||
removeTimer(timer->id);
|
removeTimer(timer->id);
|
||||||
|
@ -270,18 +273,21 @@ static void processExpiredTimer(void* handle, void* arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addToExpired(tmr_obj_t* head) {
|
static void addToExpired(tmr_obj_t* head) {
|
||||||
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] expired";
|
const char* fmt = "%s adding expired timer[id=%lld, fp=%p, param=%p] to queue.";
|
||||||
|
|
||||||
while (head != NULL) {
|
while (head != NULL) {
|
||||||
tmrTrace(fmt, head->ctrl->label, head->id, head->fp, head->param);
|
uintptr_t id = head->id;
|
||||||
|
|
||||||
tmr_obj_t* next = head->next;
|
tmr_obj_t* next = head->next;
|
||||||
|
tmrTrace(fmt, head->ctrl->label, id, head->fp, head->param);
|
||||||
|
|
||||||
SSchedMsg schedMsg;
|
SSchedMsg schedMsg;
|
||||||
schedMsg.fp = NULL;
|
schedMsg.fp = NULL;
|
||||||
schedMsg.tfp = processExpiredTimer;
|
schedMsg.tfp = processExpiredTimer;
|
||||||
schedMsg.ahandle = head;
|
schedMsg.ahandle = head;
|
||||||
schedMsg.thandle = NULL;
|
schedMsg.thandle = NULL;
|
||||||
taosScheduleTask(tmrQhandle, &schedMsg);
|
taosScheduleTask(tmrQhandle, &schedMsg);
|
||||||
|
|
||||||
|
tmrTrace("timer[id=%lld] has been added to queue.", id);
|
||||||
head = next;
|
head = next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -295,7 +301,7 @@ static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int msecon
|
||||||
timer->ctrl = ctrl;
|
timer->ctrl = ctrl;
|
||||||
addTimer(timer);
|
addTimer(timer);
|
||||||
|
|
||||||
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] started";
|
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] started";
|
||||||
tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
|
|
||||||
if (mseconds == 0) {
|
if (mseconds == 0) {
|
||||||
|
@ -318,7 +324,7 @@ tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle
|
||||||
|
|
||||||
tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
|
tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
tmrError("failed to allocated memory for new timer object.");
|
tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,7 +395,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
|
||||||
// we cannot guarantee the thread safety of the timr in all other cases.
|
// we cannot guarantee the thread safety of the timr in all other cases.
|
||||||
reusable = true;
|
reusable = true;
|
||||||
}
|
}
|
||||||
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] is cancelled.";
|
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] is cancelled.";
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
} else if (state != TIMER_STATE_EXPIRED) {
|
} else if (state != TIMER_STATE_EXPIRED) {
|
||||||
// timer already stopped or cancelled, has nothing to do in this case
|
// timer already stopped or cancelled, has nothing to do in this case
|
||||||
|
@ -400,7 +406,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
|
||||||
} else {
|
} else {
|
||||||
assert(timer->executedBy != taosGetPthreadId());
|
assert(timer->executedBy != taosGetPthreadId());
|
||||||
|
|
||||||
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] fired, waiting...";
|
const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] fired, waiting...";
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
|
|
||||||
for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) {
|
for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) {
|
||||||
|
@ -409,7 +415,7 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] stopped.";
|
fmt = "%s timer[id=%lld, fp=%p, param=%p] stopped.";
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -448,7 +454,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
|
||||||
bool stopped = false;
|
bool stopped = false;
|
||||||
tmr_obj_t* timer = findTimer(id);
|
tmr_obj_t* timer = findTimer(id);
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
tmrTrace("timer[id=%lld] does not exist", id);
|
tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id);
|
||||||
} else {
|
} else {
|
||||||
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
|
uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
|
||||||
if (!doStopTimer(timer, state)) {
|
if (!doStopTimer(timer, state)) {
|
||||||
|
@ -463,7 +469,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
|
||||||
return stopped;
|
return stopped;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmrTrace("timer[id=%lld] is reused", timer->id);
|
tmrTrace("%s timer[id=%lld] is reused", ctrl->label, timer->id);
|
||||||
|
|
||||||
// wait until there's no other reference to this timer,
|
// wait until there's no other reference to this timer,
|
||||||
// so that we can reuse this timer safely.
|
// so that we can reuse this timer safely.
|
||||||
|
@ -481,7 +487,13 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosTmrModuleInit(void) {
|
static void taosTmrModuleInit(void) {
|
||||||
for (int i = 0; i < tListLen(tmrCtrls) - 1; ++i) {
|
tmrCtrls = malloc(sizeof(tmr_ctrl_t) * taosMaxTmrCtrl);
|
||||||
|
if (tmrCtrls == NULL) {
|
||||||
|
tmrError("failed to allocate memory for timer controllers.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < taosMaxTmrCtrl - 1; ++i) {
|
||||||
tmr_ctrl_t* ctrl = tmrCtrls + i;
|
tmr_ctrl_t* ctrl = tmrCtrls + i;
|
||||||
ctrl->next = ctrl + 1;
|
ctrl->next = ctrl + 1;
|
||||||
}
|
}
|
||||||
|
@ -526,17 +538,18 @@ void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* lab
|
||||||
tmr_ctrl_t* ctrl = unusedTmrCtrl;
|
tmr_ctrl_t* ctrl = unusedTmrCtrl;
|
||||||
if (ctrl != NULL) {
|
if (ctrl != NULL) {
|
||||||
unusedTmrCtrl = ctrl->next;
|
unusedTmrCtrl = ctrl->next;
|
||||||
|
numOfTmrCtrl++;
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&tmrCtrlMutex);
|
pthread_mutex_unlock(&tmrCtrlMutex);
|
||||||
|
|
||||||
if (ctrl == NULL) {
|
if (ctrl == NULL) {
|
||||||
tmrError("too many timer controllers, failed to create timer controller[label=%s].", label);
|
tmrError("%s too many timer controllers, failed to create timer controller.", label);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
strncpy(ctrl->label, label, sizeof(ctrl->label));
|
strncpy(ctrl->label, label, sizeof(ctrl->label));
|
||||||
ctrl->label[sizeof(ctrl->label) - 1] = 0;
|
ctrl->label[sizeof(ctrl->label) - 1] = 0;
|
||||||
tmrTrace("timer controller[label=%s] is initialized.", label);
|
tmrTrace("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
|
||||||
return ctrl;
|
return ctrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,11 +557,12 @@ void taosTmrCleanUp(void* handle) {
|
||||||
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
|
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
|
||||||
assert(ctrl != NULL && ctrl->label[0] != 0);
|
assert(ctrl != NULL && ctrl->label[0] != 0);
|
||||||
|
|
||||||
tmrTrace("timer controller[label=%s] is cleaned up.", ctrl->label);
|
tmrTrace("%s timer controller is cleaned up.", ctrl->label);
|
||||||
ctrl->label[0] = 0;
|
ctrl->label[0] = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&tmrCtrlMutex);
|
pthread_mutex_lock(&tmrCtrlMutex);
|
||||||
ctrl->next = unusedTmrCtrl;
|
ctrl->next = unusedTmrCtrl;
|
||||||
|
numOfTmrCtrl--;
|
||||||
unusedTmrCtrl = ctrl;
|
unusedTmrCtrl = ctrl;
|
||||||
pthread_mutex_unlock(&tmrCtrlMutex);
|
pthread_mutex_unlock(&tmrCtrlMutex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue