Merge pull request #28043 from taosdata/enh/TD-31890-14
enh: handle void
This commit is contained in:
commit
2620807594
|
@ -73,7 +73,7 @@ static void destroyMonitorClient(void* data) {
|
|||
}
|
||||
taosHashCleanup(pMonitor->counters);
|
||||
int ret = taos_collector_registry_destroy(pMonitor->registry);
|
||||
if (ret){
|
||||
if (ret) {
|
||||
tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
|
||||
}
|
||||
taosMemoryFree(pMonitor);
|
||||
|
@ -192,7 +192,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
|
|||
|
||||
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
|
||||
int ret = taos_collector_registry_clear_batch(registry);
|
||||
if (ret){
|
||||
if (ret) {
|
||||
tscError("failed to clear registry, ret:%d", ret);
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +215,8 @@ static void reportSendProcess(void* param, void* tmrId) {
|
|||
|
||||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
|
||||
bool reset = taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
|
||||
bool reset =
|
||||
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
|
||||
tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
|
||||
taosRUnLockLatch(&monitorLock);
|
||||
}
|
||||
|
@ -265,7 +266,7 @@ void monitorCreateClient(int64_t clusterId) {
|
|||
}
|
||||
|
||||
int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
|
||||
if (r){
|
||||
if (r) {
|
||||
tscError("failed to register collector, ret:%d", r);
|
||||
goto fail;
|
||||
}
|
||||
|
@ -318,7 +319,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
|
|||
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
|
||||
tscError("failed to add metric to collector");
|
||||
int r = taos_counter_destroy(newCounter);
|
||||
if (r){
|
||||
if (r) {
|
||||
tscError("failed to destroy counter, code: %d", r);
|
||||
}
|
||||
goto end;
|
||||
|
@ -326,7 +327,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
|
|||
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
|
||||
tscError("failed to put counter to monitor");
|
||||
int r = taos_counter_destroy(newCounter);
|
||||
if (r){
|
||||
if (r) {
|
||||
tscError("failed to destroy counter, code: %d", r);
|
||||
}
|
||||
goto end;
|
||||
|
@ -394,7 +395,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
|
|||
if (pClient == NULL) {
|
||||
tscError("failed to allocate memory for slow log client");
|
||||
int32_t ret = taosCloseFile(&pFile);
|
||||
if (ret != 0){
|
||||
if (ret != 0) {
|
||||
tscError("failed to close file:%p ret:%d", pFile, ret);
|
||||
}
|
||||
return;
|
||||
|
@ -406,7 +407,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
|
|||
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
|
||||
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
|
||||
int32_t ret = taosCloseFile(&pFile);
|
||||
if (ret != 0){
|
||||
if (ret != 0) {
|
||||
tscError("failed to close file:%p ret:%d", pFile, ret);
|
||||
}
|
||||
taosMemoryFree(pClient);
|
||||
|
@ -635,7 +636,7 @@ static void processFileRemoved(SlowLogClient* pClient) {
|
|||
return;
|
||||
}
|
||||
int32_t ret = taosCloseFile(&(pClient->pFile));
|
||||
if (ret != 0){
|
||||
if (ret != 0) {
|
||||
tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
|
||||
return;
|
||||
}
|
||||
|
@ -728,7 +729,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
|
|||
if (taosLockFile(pFile) < 0) {
|
||||
tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
|
||||
int32_t ret = taosCloseFile(&pFile);
|
||||
if (ret != 0){
|
||||
if (ret != 0) {
|
||||
tscError("failed to close file:%p ret:%d", pFile, ret);
|
||||
}
|
||||
continue;
|
||||
|
@ -749,7 +750,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
|
|||
}
|
||||
|
||||
int32_t ret = taosCloseDir(&pDir);
|
||||
if (ret != 0){
|
||||
if (ret != 0) {
|
||||
tscError("failed to close dir, ret:%d", ret);
|
||||
}
|
||||
}
|
||||
|
@ -831,7 +832,7 @@ static int32_t tscMonitortInit() {
|
|||
static void tscMonitorStop() {
|
||||
if (taosCheckPthreadValid(monitorThread)) {
|
||||
(void)taosThreadJoin(monitorThread, NULL);
|
||||
(void)taosThreadClear(&monitorThread);
|
||||
taosThreadClear(&monitorThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -897,7 +898,7 @@ void monitorClose() {
|
|||
taosHashCleanup(monitorSlowLogHash);
|
||||
taosTmrCleanUp(monitorTimer);
|
||||
taosCloseQueue(monitorQueue);
|
||||
if(tsem2_destroy(&monitorSem) != 0) {
|
||||
if (tsem2_destroy(&monitorSem) != 0) {
|
||||
tscError("failed to destroy semaphore");
|
||||
}
|
||||
taosWUnLockLatch(&monitorLock);
|
||||
|
@ -921,7 +922,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
|
|||
tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
|
||||
queueTypeStr[slowLogData->type], slowLogData->data);
|
||||
if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
|
||||
if(tsem2_post(&monitorSem) != 0) {
|
||||
if (tsem2_post(&monitorSem) != 0) {
|
||||
tscError("failed to post semaphore");
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -347,14 +347,14 @@ int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
|
|||
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
|
||||
(void)taosThreadJoin(pMgmt->monitorThread, NULL);
|
||||
(void)taosThreadClear(&pMgmt->monitorThread);
|
||||
taosThreadClear(&pMgmt->monitorThread);
|
||||
}
|
||||
}
|
||||
|
||||
void dmStopAuditThread(SDnodeMgmt *pMgmt) {
|
||||
if (taosCheckPthreadValid(pMgmt->auditThread)) {
|
||||
(void)taosThreadJoin(pMgmt->auditThread, NULL);
|
||||
(void)taosThreadClear(&pMgmt->auditThread);
|
||||
taosThreadClear(&pMgmt->auditThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -385,7 +385,7 @@ void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
|
|||
|
||||
if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
|
||||
(void)taosThreadJoin(pMgmt->crashReportThread, NULL);
|
||||
(void)taosThreadClear(&pMgmt->crashReportThread);
|
||||
taosThreadClear(&pMgmt->crashReportThread);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ void tEndEncode(SEncoder* pCoder) {
|
|||
pCoder->size = pNode->size;
|
||||
pCoder->pos = pNode->pos;
|
||||
|
||||
(void)tEncodeI32(pCoder, len);
|
||||
int32_t ret = tEncodeI32(pCoder, len);
|
||||
|
||||
pCoder->pos += len;
|
||||
}
|
||||
|
|
|
@ -391,14 +391,13 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
|
||||
int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) {
|
||||
terrno = 0;
|
||||
(void)taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false);
|
||||
void *data = taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t *size) {
|
||||
terrno = 0;
|
||||
|
||||
(void)taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
|
||||
void *data = taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -257,10 +257,11 @@ static PriorityQueueNode* pqHeapify(PriorityQueue* pq, size_t from, size_t last)
|
|||
|
||||
static void pqBuildHeap(PriorityQueue* pq) {
|
||||
if (pqContainerSize(pq) > 1) {
|
||||
PriorityQueueNode* node;
|
||||
for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) {
|
||||
(void)pqHeapify(pq, i, pqContainerSize(pq));
|
||||
node = pqHeapify(pq, i, pqContainerSize(pq));
|
||||
}
|
||||
(void)pqHeapify(pq, 0, pqContainerSize(pq));
|
||||
node = pqHeapify(pq, 0, pqContainerSize(pq));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -274,23 +275,24 @@ static PriorityQueueNode* pqReverseHeapify(PriorityQueue* pq, size_t i) {
|
|||
}
|
||||
|
||||
static void pqUpdate(PriorityQueue* pq, size_t i) {
|
||||
PriorityQueueNode* node;
|
||||
if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
|
||||
// if value in pos i is smaller than parent, heapify down from i to the end
|
||||
(void)pqHeapify(pq, i, pqContainerSize(pq));
|
||||
node = pqHeapify(pq, i, pqContainerSize(pq));
|
||||
} else {
|
||||
// if value in pos i is big than parent, heapify up from i
|
||||
(void)pqReverseHeapify(pq, i);
|
||||
node = pqReverseHeapify(pq, i);
|
||||
}
|
||||
}
|
||||
|
||||
static void pqRemove(PriorityQueue* pq, size_t i) {
|
||||
if (i == pqContainerSize(pq) - 1) {
|
||||
(void)taosArrayPop(pq->container);
|
||||
void* tmp = taosArrayPop(pq->container);
|
||||
return;
|
||||
}
|
||||
|
||||
taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1));
|
||||
(void)taosArrayPop(pq->container);
|
||||
void* tmp = taosArrayPop(pq->container);
|
||||
pqUpdate(pq, i);
|
||||
}
|
||||
|
||||
|
|
|
@ -305,7 +305,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr
|
|||
SLRUEntry *old = shard->lru.next;
|
||||
|
||||
taosLRUCacheShardLRURemove(shard, old);
|
||||
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||
shard->usage -= old->totalCharge;
|
||||
|
@ -529,7 +529,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
|
|||
while (shard->lru.next != &shard->lru) {
|
||||
SLRUEntry *old = shard->lru.next;
|
||||
taosLRUCacheShardLRURemove(shard, old);
|
||||
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||
shard->usage -= old->totalCharge;
|
||||
|
||||
|
@ -574,7 +574,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
|
|||
lastReference = taosLRUEntryUnref(e);
|
||||
if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
|
||||
if (shard->usage > shard->capacity || eraseIfLastRef) {
|
||||
(void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
||||
SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||
} else {
|
||||
taosLRUCacheShardLRUInsert(shard, e);
|
||||
|
|
|
@ -32,9 +32,9 @@ static SSkipListNode *tSkipListNewNode(uint8_t level);
|
|||
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
|
||||
bool hasDup);
|
||||
|
||||
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList);
|
||||
|
||||
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags,
|
||||
|
@ -103,7 +103,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
|
|||
void tSkipListDestroy(SSkipList *pSkipList) {
|
||||
if (pSkipList == NULL) return;
|
||||
|
||||
(void)tSkipListWLock(pSkipList);
|
||||
tSkipListWLock(pSkipList);
|
||||
|
||||
SSkipListNode *pNode = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, 0);
|
||||
|
||||
|
@ -113,7 +113,7 @@ void tSkipListDestroy(SSkipList *pSkipList) {
|
|||
tSkipListFreeNode(pTemp);
|
||||
}
|
||||
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
if (pSkipList->lock != NULL) {
|
||||
(void)taosThreadRwlockDestroy(pSkipList->lock);
|
||||
taosMemoryFreeClear(pSkipList->lock);
|
||||
|
@ -130,12 +130,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
|||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
SSkipListNode *pNode = NULL;
|
||||
|
||||
(void)tSkipListWLock(pSkipList);
|
||||
tSkipListWLock(pSkipList);
|
||||
|
||||
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
|
||||
pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
|
||||
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
@ -293,11 +293,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *
|
|||
return iter;
|
||||
}
|
||||
|
||||
(void)tSkipListRLock(pSkipList);
|
||||
tSkipListRLock(pSkipList);
|
||||
|
||||
iter->cur = getPriorNode(pSkipList, val, order, &(iter->next));
|
||||
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
|
||||
return iter;
|
||||
}
|
||||
|
@ -307,13 +307,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
|
|||
|
||||
SSkipList *pSkipList = iter->pSkipList;
|
||||
|
||||
(void)tSkipListRLock(pSkipList);
|
||||
tSkipListRLock(pSkipList);
|
||||
|
||||
if (iter->order == TSDB_ORDER_ASC) {
|
||||
// no data in the skip list
|
||||
if (iter->cur == pSkipList->pTail || iter->next == NULL) {
|
||||
iter->cur = pSkipList->pTail;
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -329,7 +329,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
|
|||
} else {
|
||||
if (iter->cur == pSkipList->pHead) {
|
||||
iter->cur = pSkipList->pHead;
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -344,7 +344,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
|
|||
iter->step++;
|
||||
}
|
||||
|
||||
(void)tSkipListUnlock(pSkipList);
|
||||
tSkipListUnlock(pSkipList);
|
||||
|
||||
return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead);
|
||||
}
|
||||
|
@ -413,25 +413,31 @@ static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t
|
|||
return iter;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList) {
|
||||
static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList) {
|
||||
if (pSkipList->lock) {
|
||||
return taosThreadRwlockWrlock(pSkipList->lock);
|
||||
if (taosThreadRwlockWrlock(pSkipList->lock) != 0) {
|
||||
uError("failed to lock skip list");
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList) {
|
||||
static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList) {
|
||||
if (pSkipList->lock) {
|
||||
return taosThreadRwlockRdlock(pSkipList->lock);
|
||||
if (taosThreadRwlockRdlock(pSkipList->lock) != 0) {
|
||||
uError("failed to lock skip list");
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) {
|
||||
static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList) {
|
||||
if (pSkipList->lock) {
|
||||
return taosThreadRwlockUnlock(pSkipList->lock);
|
||||
if (taosThreadRwlockUnlock(pSkipList->lock) != 0) {
|
||||
uError("failed to unlock skip list");
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return;
|
||||
}
|
||||
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
|
||||
|
|
|
@ -59,7 +59,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
|||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +77,11 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
|
|||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
(void)taosBlockSIGPIPE();
|
||||
int32_t ret = taosBlockSIGPIPE();
|
||||
if (ret < 0) {
|
||||
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
|
||||
}
|
||||
|
||||
setThreadName(pool->name);
|
||||
worker->pid = taosGetSelfPthreadId();
|
||||
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||
|
@ -122,7 +126,13 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
|||
|
||||
(void)taosThreadMutexLock(&pool->mutex);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
(void)taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
code = taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
if (code) {
|
||||
taosCloseQueue(queue);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// spawn a thread to process queue
|
||||
if (pool->num < pool->max) {
|
||||
|
@ -191,7 +201,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
|
|||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||
}
|
||||
taosMemoryFree(worker);
|
||||
|
@ -210,7 +220,11 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
|
|||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
(void)taosBlockSIGPIPE();
|
||||
int32_t ret = taosBlockSIGPIPE();
|
||||
if (ret < 0) {
|
||||
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
|
||||
}
|
||||
|
||||
setThreadName(pool->name);
|
||||
worker->pid = taosGetSelfPthreadId();
|
||||
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||
|
@ -254,7 +268,14 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
|
||||
(void)taosThreadMutexLock(&pool->mutex);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
(void)taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
|
||||
code = taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
if (code) {
|
||||
taosCloseQueue(queue);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||
|
@ -281,7 +302,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
|
||||
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
|
||||
uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
|
||||
(void)taosArrayPop(pool->workers);
|
||||
void *tmp = taosArrayPop(pool->workers);
|
||||
taosMemoryFree(worker);
|
||||
taosCloseQueue(queue);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -342,7 +363,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
|
|||
if (taosCheckPthreadValid(worker->thread)) {
|
||||
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
taosFreeQall(worker->qall);
|
||||
taosCloseQset(worker->qset);
|
||||
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||
|
@ -362,7 +383,11 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
|
|||
int32_t code = 0;
|
||||
int32_t numOfMsgs = 0;
|
||||
|
||||
(void)taosBlockSIGPIPE();
|
||||
int32_t ret = taosBlockSIGPIPE();
|
||||
if (ret < 0) {
|
||||
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
|
||||
}
|
||||
|
||||
setThreadName(pool->name);
|
||||
worker->pid = taosGetSelfPthreadId();
|
||||
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||
|
@ -407,7 +432,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
|||
code = taosOpenQset(&worker->qset);
|
||||
if (code) goto _OVER;
|
||||
|
||||
(void)taosAddIntoQset(worker->qset, queue, ahandle);
|
||||
code = taosAddIntoQset(worker->qset, queue, ahandle);
|
||||
if (code) goto _OVER;
|
||||
code = taosAllocateQall(&worker->qall);
|
||||
if (code) goto _OVER;
|
||||
|
||||
|
@ -423,7 +449,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
|||
pool->num++;
|
||||
if (pool->num > pool->max) pool->num = pool->max;
|
||||
} else {
|
||||
(void)taosAddIntoQset(worker->qset, queue, ahandle);
|
||||
code = taosAddIntoQset(worker->qset, queue, ahandle);
|
||||
if (code) goto _OVER;
|
||||
pool->nextId = (pool->nextId + 1) % pool->max;
|
||||
}
|
||||
|
||||
|
@ -551,7 +578,7 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) {
|
|||
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
|
||||
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
|
||||
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
|
||||
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
|
||||
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
|
||||
static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
|
||||
|
||||
#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32)
|
||||
|
@ -629,7 +656,11 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
|
|||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
(void)taosBlockSIGPIPE();
|
||||
int32_t ret = taosBlockSIGPIPE();
|
||||
if (ret < 0) {
|
||||
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
|
||||
}
|
||||
|
||||
setThreadName(pool->name);
|
||||
worker->pid = taosGetSelfPthreadId();
|
||||
uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||
|
@ -648,7 +679,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
|
|||
}
|
||||
}
|
||||
|
||||
(void)tQueryAutoQWorkerWaitingCheck(pool);
|
||||
tQueryAutoQWorkerWaitingCheck(pool);
|
||||
|
||||
if (qinfo.fp != NULL) {
|
||||
qinfo.workerId = worker->id;
|
||||
|
@ -717,13 +748,13 @@ static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
|
||||
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
|
||||
while (1) {
|
||||
int64_t val64 = pPool->activeRunningN;
|
||||
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
|
||||
while (running < pPool->num) {
|
||||
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
|
||||
|
@ -736,7 +767,7 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
|
|||
if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
|
||||
// recovered from waiting
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
|
||||
|
@ -744,7 +775,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
|
|||
tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
SListNode *pNode = listNode(pWorker);
|
||||
(void)tdListPopNode(pPool->workers, pNode);
|
||||
SListNode *tNode = tdListPopNode(pPool->workers, pNode);
|
||||
// reclaim some workers
|
||||
if (pWorker->id >= pPool->maxInUse) {
|
||||
while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
|
||||
|
@ -752,7 +783,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
|
|||
SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
|
||||
if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
|
||||
(void)taosThreadJoin(pWorker->thread, NULL);
|
||||
(void)taosThreadClear(&pWorker->thread);
|
||||
taosThreadClear(&pWorker->thread);
|
||||
}
|
||||
taosMemoryFree(head);
|
||||
}
|
||||
|
@ -777,7 +808,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
|
|||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
return false;
|
||||
}
|
||||
(void)tdListPopNode(pPool->backupWorkers, pNode);
|
||||
SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
|
||||
tdListAppendNode(pPool->workers, pNode);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
||||
|
@ -862,7 +893,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
}
|
||||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
@ -872,7 +903,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
worker = (SQueryAutoQWorker *)pNode->data;
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
}
|
||||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
@ -882,7 +913,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
worker = (SQueryAutoQWorker *)pNode->data;
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
(void)taosThreadJoin(worker->thread, NULL);
|
||||
(void)taosThreadClear(&worker->thread);
|
||||
taosThreadClear(&worker->thread);
|
||||
}
|
||||
taosMemoryFree(pNode);
|
||||
}
|
||||
|
@ -913,7 +944,13 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
|
|||
|
||||
(void)taosThreadMutexLock(&pool->poolLock);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
(void)taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
code = taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
if (code) {
|
||||
taosCloseQueue(queue);
|
||||
queue = NULL;
|
||||
(void)taosThreadMutexUnlock(&pool->poolLock);
|
||||
return NULL;
|
||||
}
|
||||
SQueryAutoQWorker worker = {0};
|
||||
SQueryAutoQWorker *pWorker = NULL;
|
||||
|
||||
|
|
Loading…
Reference in New Issue