From e4956e3f03452ccbb76e54f5039dc80307cd30e2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 12 Apr 2020 10:54:10 +0800 Subject: [PATCH] remove invalid read in hash.c remove goto in tsched.c --- src/util/src/hash.c | 2 +- src/util/src/tsched.c | 111 ++++++++++++++++++++++-------------------- 2 files changed, 58 insertions(+), 55 deletions(-) diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9cad14e8c7..03e155845e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { */ static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize, uint32_t hashVal) { - size_t totalSize = dataSize + sizeof(SHashNode) + keyLen; + size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null SHashNode *pNewNode = calloc(1, totalSize); if (pNewNode == NULL) { diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 11735d91b4..8608b64057 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -40,68 +40,68 @@ static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { - pthread_attr_t attr; - SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); + SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1); if (pSched == NULL) { - pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno)); - goto _error; + pError("%s: no enough memory for pSched", label); + return NULL; + } + + pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize); + if (pSched->queue == NULL) { + pError("%s: no enough memory for queue", label); + taosCleanUpScheduler(pSched); + return NULL; + } + + pSched->qthread = calloc(sizeof(pthread_t), numOfThreads); + if (pSched->qthread == NULL) { + pError("%s: no enough memory for qthread", label); + taosCleanUpScheduler(pSched); + return NULL; } - memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow pSched->label[sizeof(pSched->label)-1] = '\0'; - if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { - pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { - pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->fullSem, 0, 0) != 0) { - pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { - pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0; pSched->emptySlot = 0; - pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads); - if (pSched->qthread == NULL) { - pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno)); - goto _error; + if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { + pError("init %s:queueMutex failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { + pError("init %s:empty semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } + + if (tsem_init(&pSched->fullSem, 0, 0) != 0) { + pError("init %s:full semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } for (int i = 0; i < numOfThreads; ++i) { - if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { - pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); - goto _error; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched); + pthread_attr_destroy(&attr); + if (code != 0) { + pError("%s: failed to create rpc thread(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } ++pSched->numOfThreads; } - pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); - pthread_attr_destroy(&attr); + pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads); return (void *)pSched; - -_error: - pthread_attr_destroy(&attr); - taosCleanUpScheduler(pSched); - return NULL; } void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { @@ -126,21 +126,21 @@ void *taosProcessSchedQueue(void *param) { pTrace("wait %s fullSem was interrupted", pSched->label); continue; } - pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); + pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); if (tsem_post(&pSched->emptySem) != 0) - pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno)); + pError("post %s emptySem failed(%s)", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); @@ -160,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { while (tsem_wait(&pSched->emptySem) != 0) { if (errno != EINTR) { - pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); + pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); break; } pTrace("wait %s emptySem was interrupted", pSched->label); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno)); + if (tsem_post(&pSched->fullSem) != 0) + pError("post %s fullSem failed(%s)", pSched->label, strerror(errno)); return 0; } @@ -185,10 +186,12 @@ void taosCleanUpScheduler(void *param) { if (pSched == NULL) return; for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_cancel(pSched->qthread[i]); + if (pSched->qthread[i]) + pthread_cancel(pSched->qthread[i]); } for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_join(pSched->qthread[i], NULL); + if (pSched->qthread[i]) + pthread_join(pSched->qthread[i], NULL); } tsem_destroy(&pSched->emptySem); @@ -199,8 +202,8 @@ void taosCleanUpScheduler(void *param) { taosTmrStopA(&pSched->pTimer); } - free(pSched->queue); - free(pSched->qthread); + if (pSched->queue) free(pSched->queue); + if (pSched->qthread) free(pSched->qthread); free(pSched); // fix memory leak }