Rename file and add some comment/changes (#6587)

1. rename semphone to semaphore
2. add comment for tsched.h
3. change the function signature for taosSchedulerTask, changing from
return int to void. We currently don't check any return code of the function.
4. add some error handlings. For fatal error, just exit the process because
the program may run into a random state.
This commit is contained in:
Jun Li 2021-06-27 06:13:33 -07:00 committed by GitHub
parent 329a0af28d
commit 9a06248432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 33 deletions

View File

@ -29,7 +29,7 @@ extern "C" {
#include "osMath.h"
#include "osMemory.h"
#include "osRand.h"
#include "osSemphone.h"
#include "osSemaphore.h"
#include "osSignal.h"
#include "osSleep.h"
#include "osSocket.h"

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SEMPHONE_H
#define TDENGINE_OS_SEMPHONE_H
#ifndef TDENGINE_OS_SEMAPHORE_H
#define TDENGINE_OS_SEMAPHORE_H
#ifdef __cplusplus
extern "C" {

View File

@ -28,10 +28,41 @@ typedef struct SSchedMsg {
void *thandle;
} SSchedMsg;
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
void taosCleanUpScheduler(void *param);
/**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @return the created queue scheduler
*/
void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
/**
* Create a thread-safe ring-buffer based task queue and return the instance.
* Same as taosInitScheduler, and it also print the queue status every 1 minite.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @param tmrCtrl the timer controller, tmr_ctrl_t*
* @return the created queue scheduler
*/
void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl);
/**
* Clean up the queue scheduler instance and free the memory.
* @param queueScheduler the queue scheduler to free
*/
void taosCleanUpScheduler(void *queueScheduler);
/**
* Schedule a new task to run, the task is described by pMsg.
* The function may be blocked if no thread is available to execute the task.
* That may happen when all threads are busy.
* @param queueScheduler the queue scheduler instance
* @param pMsg the message for the task
*/
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
#ifdef __cplusplus
}

View File

@ -108,39 +108,47 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
return pSched;
}
void *taosProcessSchedQueue(void *param) {
void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0;
while (1) {
if (tsem_wait(&pSched->fullSem) != 0) {
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (tsem_post(&pSched->emptySem) != 0)
uError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (msg.fp)
(*(msg.fp))(&msg);
@ -151,30 +159,37 @@ void *taosProcessSchedQueue(void *param) {
return NULL;
}
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
int ret = 0;
if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
return;
}
if (tsem_wait(&pSched->emptySem) != 0) {
uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (tsem_post(&pSched->fullSem) != 0)
uError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
return 0;
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
}
void taosCleanUpScheduler(void *param) {
@ -219,4 +234,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
}