From a9be7a8553ab0a89c1d93d7146dfd7b0b73e66db Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 11 Apr 2020 22:28:58 +0800 Subject: [PATCH 1/2] exit nicely --- src/dnode/src/dnodeRead.c | 84 +++++++++++++++++++++++----------- src/dnode/src/dnodeWrite.c | 31 +++++++++---- src/rpc/src/rpcCache.c | 3 +- src/rpc/src/rpcClient.c | 4 +- src/rpc/src/rpcServer.c | 7 ++- src/rpc/src/rpcUdp.c | 4 +- src/util/src/ihash.c | 1 - src/util/src/tqueue.c | 17 +++++-- src/util/src/tsched.c | 2 + src/vnode/main/src/vnodeMain.c | 3 +- 10 files changed, 106 insertions(+), 50 deletions(-) diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index d4365dae10..f13c18f22a 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -32,27 +32,51 @@ typedef struct { SRpcMsg rpcMsg; } SReadMsg; +typedef struct { + pthread_t thread; // thread + int32_t workerId; // worker ID +} SReadWorker; + +typedef struct { + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + SReadWorker *readWorker; +} SReadWorkerPool; + static void *dnodeProcessReadQueue(void *param); -static void dnodeHandleIdleReadWorker(); +static void dnodeHandleIdleReadWorker(SReadWorker *); // module global variable -static taos_qset readQset; -static int32_t threads; // number of query threads -static int32_t maxThreads; -static int32_t minThreads; +static SReadWorkerPool readPool; +static taos_qset readQset; int32_t dnodeInitRead() { readQset = taosOpenQset(); - minThreads = 3; - maxThreads = tsNumOfCores * tsNumOfThreadsPerCore; - if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads; + readPool.min = 2; + readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; + if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; + readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max); + + if (readPool.readWorker == NULL) return -1; + for (int i=0; i < readPool.max; ++i) { + SReadWorker *pWorker = readPool.readWorker + i; + pWorker->workerId = i; + } dPrint("dnode read is opened"); return 0; } void dnodeCleanupRead() { + + for (int i=0; i < readPool.max; ++i) { + SReadWorker *pWorker = readPool.readWorker + i; + if (pWorker->thread) + pthread_join(pWorker->thread, NULL); + } + taosCloseQset(readQset); dPrint("dnode read is closed"); } @@ -116,18 +140,25 @@ void *dnodeAllocateRqueue(void *pVnode) { taosAddIntoQset(readQset, queue, pVnode); // spawn a thread to process queue - if (threads < maxThreads) { - pthread_t thread; - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + if (readPool.num < readPool.max) { + do { + SReadWorker *pWorker = readPool.readWorker + readPool.num; - if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) { - dError("failed to create thread to process read queue, reason:%s", strerror(errno)); - } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { + dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + readPool.num++; + dTrace("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); + } while (readPool.num < readPool.min); } - dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); return queue; } @@ -167,14 +198,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { } static void *dnodeProcessReadQueue(void *param) { - taos_qset qset = (taos_qset)param; - SReadMsg *pReadMsg; - int type; - void *pVnode; + SReadWorker *pWorker = param; + SReadMsg *pReadMsg; + int type; + void *pVnode; while (1) { - if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) { - dnodeHandleIdleReadWorker(); + if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { + dnodeHandleIdleReadWorker(pWorker); continue; } @@ -186,11 +217,12 @@ static void *dnodeProcessReadQueue(void *param) { return NULL; } -static void dnodeHandleIdleReadWorker() { +static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { int32_t num = taosGetQueueNumber(readQset); - if (num == 0 || (num <= minThreads && threads > minThreads)) { - threads--; + if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) { + readPool.num--; + dTrace("read worker:%d is released, total:%d", pWorker->workerId, readPool.num); pthread_exit(NULL); } else { usleep(100); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 3c598ca360..b56e0d8ad7 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -28,6 +28,7 @@ #include "vnode.h" typedef struct { + taos_qall qall; taos_qset qset; // queue set pthread_t thread; // thread int32_t workerId; // worker ID @@ -65,6 +66,14 @@ int32_t dnodeInitWrite() { } void dnodeCleanupWrite() { + + for (int32_t i = 0; i < wWorkerPool.max; ++i) { + SWriteWorker *pWorker = wWorkerPool.writeWorker + i; + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + } + free(wWorkerPool.writeWorker); dPrint("dnode write is closed"); } @@ -113,6 +122,7 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pWorker->qset == NULL) return NULL; taosAddIntoQset(pWorker->qset, queue, pVnode); + pWorker->qall = taosAllocateQall(); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; pthread_attr_t thAttr; @@ -122,13 +132,17 @@ void *dnodeAllocateWqueue(void *pVnode) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { dError("failed to create thread to process read queue, reason:%s", strerror(errno)); taosCloseQset(pWorker->qset); + } else { + dTrace("write worker:%d is launched", pWorker->workerId); } + + pthread_attr_destroy(&thAttr); } else { taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } - dTrace("pVnode:%p, queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, write queue:%p is allocated", pVnode, queue); return queue; } @@ -160,17 +174,14 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) { static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; - taos_qall qall; SWriteMsg *pWrite; SWalHead *pHead; int32_t numOfMsgs; int type; void *pVnode, *item; - qall = taosAllocateQall(); - while (1) { - numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode); + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; @@ -178,7 +189,7 @@ static void *dnodeProcessWriteQueue(void *param) { for (int32_t i = 0; i < numOfMsgs; ++i) { pWrite = NULL; - taosGetQitem(qall, &type, &item); + taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); @@ -196,9 +207,9 @@ static void *dnodeProcessWriteQueue(void *param) { walFsync(vnodeGetWal(pVnode)); // browse all items, and process them one by one - taosResetQitems(qall); + taosResetQitems(pWorker->qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, &type, &item); + taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code); @@ -209,8 +220,6 @@ static void *dnodeProcessWriteQueue(void *param) { } } - taosFreeQall(qall); - return NULL; } @@ -221,8 +230,10 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { usleep(1000); sched_yield(); } else { + taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); pWorker->qset = NULL; + dTrace("write worker:%d is released", pWorker->workerId); pthread_exit(NULL); } } diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index a397f6f845..a4863ef61d 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -103,7 +103,8 @@ void rpcCloseConnCache(void *handle) { if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); tfree(pCache->connHashList); - tfree(pCache->count) + tfree(pCache->count); + tfree(pCache->lockedBy); pthread_mutex_unlock(&pCache->mutex); diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index b362b1ba44..264449bbb0 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -84,7 +84,9 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) { + int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)); + pthread_attr_destroy(&thattr); + if (code != 0) { tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); return NULL; } diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 1aadabc5f7..538b3059e3 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -83,6 +83,9 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, } memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pThreadObj = pServerObj->pThreadObj; for (i = 0; i < numOfThreads; ++i) { pThreadObj->processData = fp; @@ -105,8 +108,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, return NULL; } - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); return NULL; @@ -116,8 +117,6 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, pThreadObj++; } - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); return NULL; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 64a4df0e73..785288f5b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -146,10 +146,12 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pConn->tmrCtrl = pSet->tmrCtrl; } - if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { + int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); + if (code != 0) { tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); taosCloseSocket(pConn->fd); taosCleanUpUdpConnection(pSet); + pthread_attr_destroy(&thAttr); return NULL; } diff --git a/src/util/src/ihash.c b/src/util/src/ihash.c index 30773ae8d9..2cfadad964 100644 --- a/src/util/src/ihash.c +++ b/src/util/src/ihash.c @@ -189,7 +189,6 @@ void taosCleanUpIntHash(void *handle) { free(pObj->hashList); } - memset(pObj, 0, sizeof(IHashObj)); free(pObj->lockedBy); free(pObj); } diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 2cf94267f8..b9f1141d35 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); + pTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -297,14 +297,16 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; + + pthread_mutex_lock(&qset->mutex); for(int i=0; inumOfQueues; ++i) { - pthread_mutex_lock(&qset->mutex); + //pthread_mutex_lock(&qset->mutex); if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; - pthread_mutex_unlock(&qset->mutex); + //pthread_mutex_unlock(&qset->mutex); if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -326,6 +328,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand if (pNode) break; } + pthread_mutex_unlock(&qset->mutex); + return code; } @@ -335,13 +339,15 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { STaosQall *qall = (STaosQall *)p2; int code = 0; + pthread_mutex_lock(&qset->mutex); + for(int i=0; inumOfQueues; ++i) { - pthread_mutex_lock(&qset->mutex); + // pthread_mutex_lock(&qset->mutex); if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; - pthread_mutex_unlock(&qset->mutex); + // pthread_mutex_unlock(&qset->mutex); if (queue == NULL) break; pthread_mutex_lock(&queue->mutex); @@ -365,6 +371,7 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { if (code != 0) break; } + pthread_mutex_unlock(&qset->mutex); return code; } diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 56d16eeb71..11735d91b4 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -94,10 +94,12 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { } pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); + pthread_attr_destroy(&attr); return (void *)pSched; _error: + pthread_attr_destroy(&attr); taosCleanUpScheduler(pSched); return NULL; } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index ea7a003d3d..4d77e007ad 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -224,10 +224,11 @@ void vnodeRelease(void *pVnodeRaw) { // remove the whole directory } - dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); free(pVnode); int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1); + dTrace("pVnode:%p vgId:%d, vnode is released, vnodes:%d", pVnode, vgId, count); + if (count <= 0) { taosCleanUpIntHash(tsDnodeVnodesHash); vnodeModuleInit = PTHREAD_ONCE_INIT; From e4956e3f03452ccbb76e54f5039dc80307cd30e2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 12 Apr 2020 10:54:10 +0800 Subject: [PATCH 2/2] remove invalid read in hash.c remove goto in tsched.c --- src/util/src/hash.c | 2 +- src/util/src/tsched.c | 111 ++++++++++++++++++++++-------------------- 2 files changed, 58 insertions(+), 55 deletions(-) diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9cad14e8c7..03e155845e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -298,7 +298,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { */ static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize, uint32_t hashVal) { - size_t totalSize = dataSize + sizeof(SHashNode) + keyLen; + size_t totalSize = dataSize + sizeof(SHashNode) + keyLen + 1; // one extra byte for null SHashNode *pNewNode = calloc(1, totalSize); if (pNewNode == NULL) { diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 11735d91b4..8608b64057 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -40,68 +40,68 @@ static void *taosProcessSchedQueue(void *param); static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { - pthread_attr_t attr; - SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); + SSchedQueue *pSched = (SSchedQueue *)calloc(sizeof(SSchedQueue), 1); if (pSched == NULL) { - pError("%s: no enough memory for pSched, reason: %s", label, strerror(errno)); - goto _error; + pError("%s: no enough memory for pSched", label); + return NULL; + } + + pSched->queue = (SSchedMsg *)calloc(sizeof(SSchedMsg), queueSize); + if (pSched->queue == NULL) { + pError("%s: no enough memory for queue", label); + taosCleanUpScheduler(pSched); + return NULL; + } + + pSched->qthread = calloc(sizeof(pthread_t), numOfThreads); + if (pSched->qthread == NULL) { + pError("%s: no enough memory for qthread", label); + taosCleanUpScheduler(pSched); + return NULL; } - memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; strncpy(pSched->label, label, sizeof(pSched->label)); // fix buffer overflow pSched->label[sizeof(pSched->label)-1] = '\0'; - if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { - pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { - pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if (tsem_init(&pSched->fullSem, 0, 0) != 0) { - pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { - pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno)); - goto _error; - } - - memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0; pSched->emptySlot = 0; - pSched->qthread = malloc(sizeof(pthread_t) * (size_t)numOfThreads); - if (pSched->qthread == NULL) { - pError("%s: no enough memory for qthread, reason: %s", pSched->label, strerror(errno)); - goto _error; + if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { + pError("init %s:queueMutex failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (tsem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { + pError("init %s:empty semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } + + if (tsem_init(&pSched->fullSem, 0, 0) != 0) { + pError("init %s:full semaphore failed(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; + } for (int i = 0; i < numOfThreads; ++i) { - if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { - pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); - goto _error; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + int code = pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched); + pthread_attr_destroy(&attr); + if (code != 0) { + pError("%s: failed to create rpc thread(%s)", label, strerror(errno)); + taosCleanUpScheduler(pSched); + return NULL; } ++pSched->numOfThreads; } - pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); - pthread_attr_destroy(&attr); + pTrace("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads); return (void *)pSched; - -_error: - pthread_attr_destroy(&attr); - taosCleanUpScheduler(pSched); - return NULL; } void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { @@ -126,21 +126,21 @@ void *taosProcessSchedQueue(void *param) { pTrace("wait %s fullSem was interrupted", pSched->label); continue; } - pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); + pError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); if (tsem_post(&pSched->emptySem) != 0) - pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno)); + pError("post %s emptySem failed(%s)", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); @@ -160,22 +160,23 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { while (tsem_wait(&pSched->emptySem) != 0) { if (errno != EINTR) { - pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); + pError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); break; } pTrace("wait %s emptySem was interrupted", pSched->label); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); + pError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - if (tsem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno)); + if (tsem_post(&pSched->fullSem) != 0) + pError("post %s fullSem failed(%s)", pSched->label, strerror(errno)); return 0; } @@ -185,10 +186,12 @@ void taosCleanUpScheduler(void *param) { if (pSched == NULL) return; for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_cancel(pSched->qthread[i]); + if (pSched->qthread[i]) + pthread_cancel(pSched->qthread[i]); } for (int i = 0; i < pSched->numOfThreads; ++i) { - pthread_join(pSched->qthread[i], NULL); + if (pSched->qthread[i]) + pthread_join(pSched->qthread[i], NULL); } tsem_destroy(&pSched->emptySem); @@ -199,8 +202,8 @@ void taosCleanUpScheduler(void *param) { taosTmrStopA(&pSched->pTimer); } - free(pSched->queue); - free(pSched->qthread); + if (pSched->queue) free(pSched->queue); + if (pSched->qthread) free(pSched->qthread); free(pSched); // fix memory leak }