parent
a9be7a8553
commit
e4956e3f03
|
@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
|||
*/
|
||||
static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize,
|
||||
uint32_t hashVal) {
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen;
|
||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null
|
||||
|
||||
SHashNode *pNewNode = calloc(1, totalSize);
|
||||
if (pNewNode == NULL) {
|
||||
|
|
|
@ -40,68 +40,68 @@ static void *taosProcessSchedQueue(void *param);
|
|||
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||
|
||||
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
||||
pthread_attr_t attr;
|
||||
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
|
||||
SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1);
|
||||
if (pSched == NULL) {
|
||||
pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno));
|
||||
goto _error;
|
||||
pError("%s: no enough memory for pSched", label);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize);
|
||||
if (pSched->queue == NULL) {
|
||||
pError("%s: no enough memory for queue", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSched->qthread = calloc(sizeof(pthread_t), numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(pSched, 0, sizeof(SSchedQueue));
|
||||
pSched->queueSize = queueSize;
|
||||
strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow
|
||||
pSched->label[sizeof(pSched->label)-1] = '\0';
|
||||
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
|
||||
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
}
|
||||
|
||||
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
|
||||
pSched->fullSlot = 0;
|
||||
pSched->emptySlot = 0;
|
||||
|
||||
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads);
|
||||
if (pSched->qthread == NULL) {
|
||||
pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
|
||||
pError("init %s:queueMutex failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
|
||||
pError("init %s:empty semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
pError("init %s:full semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
|
||||
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
|
||||
goto _error;
|
||||
pthread_attr_t attr;
|
||||
pthread_attr_init(&attr);
|
||||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
||||
int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
|
||||
pthread_attr_destroy(&attr);
|
||||
if (code != 0) {
|
||||
pError("%s: failed to create rpc thread(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
++pSched->numOfThreads;
|
||||
}
|
||||
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
|
||||
pthread_attr_destroy(&attr);
|
||||
pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads);
|
||||
|
||||
return (void *)pSched;
|
||||
|
||||
_error:
|
||||
pthread_attr_destroy(&attr);
|
||||
taosCleanUpScheduler(pSched);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
|
||||
|
@ -126,21 +126,21 @@ void *taosProcessSchedQueue(void *param) {
|
|||
pTrace("wait %s fullSem was interrupted", pSched->label);
|
||||
continue;
|
||||
}
|
||||
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
|
||||
pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
msg = pSched->queue[pSched->fullSlot];
|
||||
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
|
||||
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->emptySem) != 0)
|
||||
pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));
|
||||
pError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (msg.fp)
|
||||
(*(msg.fp))(&msg);
|
||||
|
@ -160,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
|
|||
|
||||
while (tsem_wait(&pSched->emptySem) != 0) {
|
||||
if (errno != EINTR) {
|
||||
pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
pTrace("wait %s emptySem was interrupted", pSched->label);
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
pSched->queue[pSched->emptySlot] = *pMsg;
|
||||
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
|
||||
|
||||
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
|
||||
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
|
||||
pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
|
||||
if (tsem_post(&pSched->fullSem) != 0)
|
||||
pError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -185,10 +186,12 @@ void taosCleanUpScheduler(void *param) {
|
|||
if (pSched == NULL) return;
|
||||
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
if (pSched->qthread[i])
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
}
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
if (pSched->qthread[i])
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
}
|
||||
|
||||
tsem_destroy(&pSched->emptySem);
|
||||
|
@ -199,8 +202,8 @@ void taosCleanUpScheduler(void *param) {
|
|||
taosTmrStopA(&pSched->pTimer);
|
||||
}
|
||||
|
||||
free(pSched->queue);
|
||||
free(pSched->qthread);
|
||||
if (pSched->queue) free(pSched->queue);
|
||||
if (pSched->qthread) free(pSched->qthread);
|
||||
free(pSched); // fix memory leak
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue