enhance robustness of scheduler
1. check all memory allocation failure in `taosInitScheduler`; 2. make `pSched->numOfThreads` the actual number of created threads to avoid crash in `taosCleanUpScheduler` when `pSched->qthread` is NULL and other issues; 3. check interruption of `sem_wait` in `taosScheduleTask`, and use `pTrace` instead of `pError` when `sem_wait` was interrupted; 4. cancel all threads before join them to enable cocurrent cancellation; 5. remove unused global variable.
This commit is contained in:
parent
747e93260c
commit
83eb20d3b3
|
@ -38,17 +38,19 @@ typedef struct {
|
|||
SSchedMsg * queue;
|
||||
} SSchedQueue;
|
||||
|
||||
void (*taosSchedFp[128])(SSchedMsg *msg) = {0};
|
||||
void *taosProcessSchedQueue(void *param);
|
||||
void taosCleanUpScheduler(void *param);
|
||||
|
||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
||||
pthread_attr_t attr;
|
||||
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
|
||||
if (pSched == NULL) {
|
||||
pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
memset(pSched, 0, sizeof(SSchedQueue));
|
||||
pSched->queueSize = queueSize;
|
||||
pSched->numOfThreads = numOfThreads;
|
||||
strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
|
||||
pSched->label[sizeof(pSched->label)-1] = '\0';
|
||||
|
||||
|
@ -76,16 +78,21 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
|||
pSched->fullSlot = 0;
|
||||
pSched->emptySlot = 0;
|
||||
|
||||
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
|
||||
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
|
||||
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
++pSched->numOfThreads;
|
||||
}
|
||||
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
|
||||
|
@ -103,11 +110,12 @@ void *taosProcessSchedQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (sem_wait(&pSched->fullSem) != 0) {
|
||||
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
|
||||
if (errno == EINTR) {
|
||||
/* sem_wait is interrupted by interrupt, ignore and continue */
|
||||
pTrace("wait %s fullSem was interrupted", pSched->label);
|
||||
continue;
|
||||
}
|
||||
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
|
@ -137,7 +145,13 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
|
||||
while (sem_wait(&pSched->emptySem) != 0) {
|
||||
if (errno != EINTR) {
|
||||
pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
pTrace("wait %s emptySem was interrupted", pSched->label);
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
|
@ -159,6 +173,8 @@ void taosCleanUpScheduler(void *param) {
|
|||
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
}
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue