From a1804419986fa8f8f141290529cee0669265111b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 23 Sep 2024 16:20:05 +0800 Subject: [PATCH] enh: handle void --- source/client/src/clientMonitor.c | 29 +++---- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 6 +- source/util/src/tencode.c | 2 +- source/util/src/thash.c | 5 +- source/util/src/theap.c | 14 ++-- source/util/src/tlrucache.c | 6 +- source/util/src/tskiplist.c | 50 ++++++------ source/util/src/tworker.c | 85 +++++++++++++++------ 8 files changed, 121 insertions(+), 76 deletions(-) diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index dbcc046b4a..f424e4d1a2 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -73,7 +73,7 @@ static void destroyMonitorClient(void* data) { } taosHashCleanup(pMonitor->counters); int ret = taos_collector_registry_destroy(pMonitor->registry); - if (ret){ + if (ret) { tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret); } taosMemoryFree(pMonitor); @@ -192,7 +192,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { int ret = taos_collector_registry_clear_batch(registry); - if (ret){ + if (ret) { tscError("failed to clear registry, ret:%d", ret); } } @@ -215,7 +215,8 @@ static void reportSendProcess(void* param, void* tmrId) { SEpSet ep = getEpSet_s(&pInst->mgmtEp); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); - bool reset = taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); + bool reset = + taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset); taosRUnLockLatch(&monitorLock); } @@ -265,7 +266,7 @@ void monitorCreateClient(int64_t clusterId) { } int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); - if (r){ + if (r) { tscError("failed to register collector, ret:%d", r); goto fail; } @@ -318,7 +319,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) { tscError("failed to add metric to collector"); int r = taos_counter_destroy(newCounter); - if (r){ + if (r) { tscError("failed to destroy counter, code: %d", r); } goto end; @@ -326,7 +327,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); int r = taos_counter_destroy(newCounter); - if (r){ + if (r) { tscError("failed to destroy counter, code: %d", r); } goto end; @@ -394,7 +395,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP if (pClient == NULL) { tscError("failed to allocate memory for slow log client"); int32_t ret = taosCloseFile(&pFile); - if (ret != 0){ + if (ret != 0) { tscError("failed to close file:%p ret:%d", pFile, ret); } return; @@ -406,7 +407,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); int32_t ret = taosCloseFile(&pFile); - if (ret != 0){ + if (ret != 0) { tscError("failed to close file:%p ret:%d", pFile, ret); } taosMemoryFree(pClient); @@ -635,7 +636,7 @@ static void processFileRemoved(SlowLogClient* pClient) { return; } int32_t ret = taosCloseFile(&(pClient->pFile)); - if (ret != 0){ + if (ret != 0) { tscError("failed to close file:%p ret:%d", pClient->pFile, ret); return; } @@ -728,7 +729,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { if (taosLockFile(pFile) < 0) { tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); int32_t ret = taosCloseFile(&pFile); - if (ret != 0){ + if (ret != 0) { tscError("failed to close file:%p ret:%d", pFile, ret); } continue; @@ -749,7 +750,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { } int32_t ret = taosCloseDir(&pDir); - if (ret != 0){ + if (ret != 0) { tscError("failed to close dir, ret:%d", ret); } } @@ -831,7 +832,7 @@ static int32_t tscMonitortInit() { static void tscMonitorStop() { if (taosCheckPthreadValid(monitorThread)) { (void)taosThreadJoin(monitorThread, NULL); - (void)taosThreadClear(&monitorThread); + taosThreadClear(&monitorThread); } } @@ -897,7 +898,7 @@ void monitorClose() { taosHashCleanup(monitorSlowLogHash); taosTmrCleanUp(monitorTimer); taosCloseQueue(monitorQueue); - if(tsem2_destroy(&monitorSem) != 0) { + if (tsem2_destroy(&monitorSem) != 0) { tscError("failed to destroy semaphore"); } taosWUnLockLatch(&monitorLock); @@ -921,7 +922,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); if (taosWriteQitem(monitorQueue, slowLogData) == 0) { - if(tsem2_post(&monitorSem) != 0) { + if (tsem2_post(&monitorSem) != 0) { tscError("failed to post semaphore"); } } else { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 4cfadc8f59..1ed7c9ecd9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -347,14 +347,14 @@ int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->monitorThread)) { (void)taosThreadJoin(pMgmt->monitorThread, NULL); - (void)taosThreadClear(&pMgmt->monitorThread); + taosThreadClear(&pMgmt->monitorThread); } } void dmStopAuditThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->auditThread)) { (void)taosThreadJoin(pMgmt->auditThread, NULL); - (void)taosThreadClear(&pMgmt->auditThread); + taosThreadClear(&pMgmt->auditThread); } } @@ -385,7 +385,7 @@ void dmStopCrashReportThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->crashReportThread)) { (void)taosThreadJoin(pMgmt->crashReportThread, NULL); - (void)taosThreadClear(&pMgmt->crashReportThread); + taosThreadClear(&pMgmt->crashReportThread); } } diff --git a/source/util/src/tencode.c b/source/util/src/tencode.c index 99b0b2bded..e962edaa27 100644 --- a/source/util/src/tencode.c +++ b/source/util/src/tencode.c @@ -112,7 +112,7 @@ void tEndEncode(SEncoder* pCoder) { pCoder->size = pNode->size; pCoder->pos = pNode->pos; - (void)tEncodeI32(pCoder, len); + int32_t ret = tEncodeI32(pCoder, len); pCoder->pos += len; } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 758e283bc3..e3fc68928f 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -391,14 +391,13 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) { terrno = 0; - (void)taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false); + void *data = taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false); return terrno; } int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t *size) { terrno = 0; - - (void)taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false); + void *data = taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false); return terrno; } diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 7ee49ff56d..e906d1f55b 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -257,10 +257,11 @@ static PriorityQueueNode* pqHeapify(PriorityQueue* pq, size_t from, size_t last) static void pqBuildHeap(PriorityQueue* pq) { if (pqContainerSize(pq) > 1) { + PriorityQueueNode* node; for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) { - (void)pqHeapify(pq, i, pqContainerSize(pq)); + node = pqHeapify(pq, i, pqContainerSize(pq)); } - (void)pqHeapify(pq, 0, pqContainerSize(pq)); + node = pqHeapify(pq, 0, pqContainerSize(pq)); } } @@ -274,23 +275,24 @@ static PriorityQueueNode* pqReverseHeapify(PriorityQueue* pq, size_t i) { } static void pqUpdate(PriorityQueue* pq, size_t i) { + PriorityQueueNode* node; if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { // if value in pos i is smaller than parent, heapify down from i to the end - (void)pqHeapify(pq, i, pqContainerSize(pq)); + node = pqHeapify(pq, i, pqContainerSize(pq)); } else { // if value in pos i is big than parent, heapify up from i - (void)pqReverseHeapify(pq, i); + node = pqReverseHeapify(pq, i); } } static void pqRemove(PriorityQueue* pq, size_t i) { if (i == pqContainerSize(pq) - 1) { - (void)taosArrayPop(pq->container); + void* tmp = taosArrayPop(pq->container); return; } taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1)); - (void)taosArrayPop(pq->container); + void* tmp = taosArrayPop(pq->container); pqUpdate(pq, i); } diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index cfbd875890..5764f936f6 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -305,7 +305,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr SLRUEntry *old = shard->lru.next; taosLRUCacheShardLRURemove(shard, old); - (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); shard->usage -= old->totalCharge; @@ -529,7 +529,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { while (shard->lru.next != &shard->lru) { SLRUEntry *old = shard->lru.next; taosLRUCacheShardLRURemove(shard, old); - (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); + SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); shard->usage -= old->totalCharge; @@ -574,7 +574,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b lastReference = taosLRUEntryUnref(e); if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) { if (shard->usage > shard->capacity || eraseIfLastRef) { - (void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); + SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); } else { taosLRUCacheShardLRUInsert(shard, e); diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index ae01292e08..95680686cf 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -32,9 +32,9 @@ static SSkipListNode *tSkipListNewNode(uint8_t level); static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward, bool hasDup); -static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList); -static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList); -static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList); +static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList); +static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList); +static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList); static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList); SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags, @@ -103,7 +103,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ void tSkipListDestroy(SSkipList *pSkipList) { if (pSkipList == NULL) return; - (void)tSkipListWLock(pSkipList); + tSkipListWLock(pSkipList); SSkipListNode *pNode = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, 0); @@ -113,7 +113,7 @@ void tSkipListDestroy(SSkipList *pSkipList) { tSkipListFreeNode(pTemp); } - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); if (pSkipList->lock != NULL) { (void)taosThreadRwlockDestroy(pSkipList->lock); taosMemoryFreeClear(pSkipList->lock); @@ -130,12 +130,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *pNode = NULL; - (void)tSkipListWLock(pSkipList); + tSkipListWLock(pSkipList); bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); return pNode; } @@ -293,11 +293,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char * return iter; } - (void)tSkipListRLock(pSkipList); + tSkipListRLock(pSkipList); iter->cur = getPriorNode(pSkipList, val, order, &(iter->next)); - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); return iter; } @@ -307,13 +307,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) { SSkipList *pSkipList = iter->pSkipList; - (void)tSkipListRLock(pSkipList); + tSkipListRLock(pSkipList); if (iter->order == TSDB_ORDER_ASC) { // no data in the skip list if (iter->cur == pSkipList->pTail || iter->next == NULL) { iter->cur = pSkipList->pTail; - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); return false; } @@ -329,7 +329,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) { } else { if (iter->cur == pSkipList->pHead) { iter->cur = pSkipList->pHead; - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); return false; } @@ -344,7 +344,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) { iter->step++; } - (void)tSkipListUnlock(pSkipList); + tSkipListUnlock(pSkipList); return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); } @@ -413,25 +413,31 @@ static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t return iter; } -static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList) { +static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList) { if (pSkipList->lock) { - return taosThreadRwlockWrlock(pSkipList->lock); + if (taosThreadRwlockWrlock(pSkipList->lock) != 0) { + uError("failed to lock skip list"); + } } - return 0; + return; } -static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList) { +static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList) { if (pSkipList->lock) { - return taosThreadRwlockRdlock(pSkipList->lock); + if (taosThreadRwlockRdlock(pSkipList->lock) != 0) { + uError("failed to lock skip list"); + } } - return 0; + return; } -static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) { +static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList) { if (pSkipList->lock) { - return taosThreadRwlockUnlock(pSkipList->lock); + if (taosThreadRwlockUnlock(pSkipList->lock) != 0) { + uError("failed to unlock skip list"); + } } - return 0; + return; } static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index c2757dcabc..940cc2b8c8 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -59,7 +59,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) { if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); uInfo("worker:%s:%d is stopped", pool->name, worker->id); } } @@ -77,7 +77,11 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { void *msg = NULL; int32_t code = 0; - (void)taosBlockSIGPIPE(); + int32_t ret = taosBlockSIGPIPE(); + if (ret < 0) { + uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id); + } + setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); @@ -122,7 +126,13 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { (void)taosThreadMutexLock(&pool->mutex); taosSetQueueFp(queue, fp, NULL); - (void)taosAddIntoQset(pool->qset, queue, ahandle); + code = taosAddIntoQset(pool->qset, queue, ahandle); + if (code) { + taosCloseQueue(queue); + (void)taosThreadMutexUnlock(&pool->mutex); + terrno = code; + return NULL; + } // spawn a thread to process queue if (pool->num < pool->max) { @@ -191,7 +201,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) { if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); uInfo("worker:%s:%d is stopped", pool->name, worker->id); } taosMemoryFree(worker); @@ -210,7 +220,11 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { void *msg = NULL; int32_t code = 0; - (void)taosBlockSIGPIPE(); + int32_t ret = taosBlockSIGPIPE(); + if (ret < 0) { + uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id); + } + setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); @@ -254,7 +268,14 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem (void)taosThreadMutexLock(&pool->mutex); taosSetQueueFp(queue, fp, NULL); - (void)taosAddIntoQset(pool->qset, queue, ahandle); + + code = taosAddIntoQset(pool->qset, queue, ahandle); + if (code) { + taosCloseQueue(queue); + (void)taosThreadMutexUnlock(&pool->mutex); + terrno = code; + return NULL; + } int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t curWorkerNum = taosArrayGetSize(pool->workers); @@ -281,7 +302,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) { uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum); - (void)taosArrayPop(pool->workers); + void *tmp = taosArrayPop(pool->workers); taosMemoryFree(worker); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -342,7 +363,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) { if (taosCheckPthreadValid(worker->thread)) { uInfo("worker:%s:%d is stopping", pool->name, worker->id); (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); taosFreeQall(worker->qall); taosCloseQset(worker->qset); uInfo("worker:%s:%d is stopped", pool->name, worker->id); @@ -362,7 +383,11 @@ static void *tWWorkerThreadFp(SWWorker *worker) { int32_t code = 0; int32_t numOfMsgs = 0; - (void)taosBlockSIGPIPE(); + int32_t ret = taosBlockSIGPIPE(); + if (ret < 0) { + uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id); + } + setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); @@ -407,7 +432,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { code = taosOpenQset(&worker->qset); if (code) goto _OVER; - (void)taosAddIntoQset(worker->qset, queue, ahandle); + code = taosAddIntoQset(worker->qset, queue, ahandle); + if (code) goto _OVER; code = taosAllocateQall(&worker->qall); if (code) goto _OVER; @@ -423,7 +449,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pool->num++; if (pool->num > pool->max) pool->num = pool->max; } else { - (void)taosAddIntoQset(worker->qset, queue, ahandle); + code = taosAddIntoQset(worker->qset, queue, ahandle); + if (code) goto _OVER; pool->nextId = (pool->nextId + 1) % pool->max; } @@ -551,7 +578,7 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); -static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool); +static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker); #define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) @@ -629,7 +656,11 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { void *msg = NULL; int32_t code = 0; - (void)taosBlockSIGPIPE(); + int32_t ret = taosBlockSIGPIPE(); + if (ret < 0) { + uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id); + } + setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); @@ -648,7 +679,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - (void)tQueryAutoQWorkerWaitingCheck(pool); + tQueryAutoQWorkerWaitingCheck(pool); if (qinfo.fp != NULL) { qinfo.workerId = worker->id; @@ -717,13 +748,13 @@ static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) { return false; } -static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) { +static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) { while (1) { int64_t val64 = pPool->activeRunningN; int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); while (running < pPool->num) { if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) { - return TSDB_CODE_SUCCESS; + return; } } if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) { @@ -736,7 +767,7 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) { if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); // recovered from waiting (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); - return TSDB_CODE_SUCCESS; + return; } bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) { @@ -744,7 +775,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { (void)taosThreadMutexLock(&pPool->poolLock); SListNode *pNode = listNode(pWorker); - (void)tdListPopNode(pPool->workers, pNode); + SListNode *tNode = tdListPopNode(pPool->workers, pNode); // reclaim some workers if (pWorker->id >= pPool->maxInUse) { while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { @@ -752,7 +783,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data; if (pWorker && taosCheckPthreadValid(pWorker->thread)) { (void)taosThreadJoin(pWorker->thread, NULL); - (void)taosThreadClear(&pWorker->thread); + taosThreadClear(&pWorker->thread); } taosMemoryFree(head); } @@ -777,7 +808,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ (void)taosThreadMutexUnlock(&pPool->poolLock); return false; } - (void)tdListPopNode(pPool->backupWorkers, pNode); + SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode); tdListAppendNode(pPool->workers, pNode); (void)taosThreadMutexUnlock(&pPool->poolLock); @@ -862,7 +893,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { (void)taosThreadMutexUnlock(&pPool->poolLock); if (worker && taosCheckPthreadValid(worker->thread)) { (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); } taosMemoryFree(pNode); } @@ -872,7 +903,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); } taosMemoryFree(pNode); } @@ -882,7 +913,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { (void)taosThreadJoin(worker->thread, NULL); - (void)taosThreadClear(&worker->thread); + taosThreadClear(&worker->thread); } taosMemoryFree(pNode); } @@ -913,7 +944,13 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand (void)taosThreadMutexLock(&pool->poolLock); taosSetQueueFp(queue, fp, NULL); - (void)taosAddIntoQset(pool->qset, queue, ahandle); + code = taosAddIntoQset(pool->qset, queue, ahandle); + if (code) { + taosCloseQueue(queue); + queue = NULL; + (void)taosThreadMutexUnlock(&pool->poolLock); + return NULL; + } SQueryAutoQWorker worker = {0}; SQueryAutoQWorker *pWorker = NULL;