enh: handle void

This commit is contained in:
Hongze Cheng 2024-09-23 16:20:05 +08:00
parent 5d83ea76dc
commit a180441998
8 changed files with 121 additions and 76 deletions

View File

@ -73,7 +73,7 @@ static void destroyMonitorClient(void* data) {
} }
taosHashCleanup(pMonitor->counters); taosHashCleanup(pMonitor->counters);
int ret = taos_collector_registry_destroy(pMonitor->registry); int ret = taos_collector_registry_destroy(pMonitor->registry);
if (ret){ if (ret) {
tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret); tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
} }
taosMemoryFree(pMonitor); taosMemoryFree(pMonitor);
@ -192,7 +192,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
int ret = taos_collector_registry_clear_batch(registry); int ret = taos_collector_registry_clear_batch(registry);
if (ret){ if (ret) {
tscError("failed to clear registry, ret:%d", ret); tscError("failed to clear registry, ret:%d", ret);
} }
} }
@ -215,7 +215,8 @@ static void reportSendProcess(void* param, void* tmrId) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
bool reset = taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); bool reset =
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset); tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
taosRUnLockLatch(&monitorLock); taosRUnLockLatch(&monitorLock);
} }
@ -265,7 +266,7 @@ void monitorCreateClient(int64_t clusterId) {
} }
int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
if (r){ if (r) {
tscError("failed to register collector, ret:%d", r); tscError("failed to register collector, ret:%d", r);
goto fail; goto fail;
} }
@ -318,7 +319,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) { if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector"); tscError("failed to add metric to collector");
int r = taos_counter_destroy(newCounter); int r = taos_counter_destroy(newCounter);
if (r){ if (r) {
tscError("failed to destroy counter, code: %d", r); tscError("failed to destroy counter, code: %d", r);
} }
goto end; goto end;
@ -326,7 +327,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char*
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor"); tscError("failed to put counter to monitor");
int r = taos_counter_destroy(newCounter); int r = taos_counter_destroy(newCounter);
if (r){ if (r) {
tscError("failed to destroy counter, code: %d", r); tscError("failed to destroy counter, code: %d", r);
} }
goto end; goto end;
@ -394,7 +395,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
if (pClient == NULL) { if (pClient == NULL) {
tscError("failed to allocate memory for slow log client"); tscError("failed to allocate memory for slow log client");
int32_t ret = taosCloseFile(&pFile); int32_t ret = taosCloseFile(&pFile);
if (ret != 0){ if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret); tscError("failed to close file:%p ret:%d", pFile, ret);
} }
return; return;
@ -406,7 +407,7 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId);
int32_t ret = taosCloseFile(&pFile); int32_t ret = taosCloseFile(&pFile);
if (ret != 0){ if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret); tscError("failed to close file:%p ret:%d", pFile, ret);
} }
taosMemoryFree(pClient); taosMemoryFree(pClient);
@ -635,7 +636,7 @@ static void processFileRemoved(SlowLogClient* pClient) {
return; return;
} }
int32_t ret = taosCloseFile(&(pClient->pFile)); int32_t ret = taosCloseFile(&(pClient->pFile));
if (ret != 0){ if (ret != 0) {
tscError("failed to close file:%p ret:%d", pClient->pFile, ret); tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
return; return;
} }
@ -728,7 +729,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
if (taosLockFile(pFile) < 0) { if (taosLockFile(pFile) < 0) {
tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
int32_t ret = taosCloseFile(&pFile); int32_t ret = taosCloseFile(&pFile);
if (ret != 0){ if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret); tscError("failed to close file:%p ret:%d", pFile, ret);
} }
continue; continue;
@ -749,7 +750,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
} }
int32_t ret = taosCloseDir(&pDir); int32_t ret = taosCloseDir(&pDir);
if (ret != 0){ if (ret != 0) {
tscError("failed to close dir, ret:%d", ret); tscError("failed to close dir, ret:%d", ret);
} }
} }
@ -831,7 +832,7 @@ static int32_t tscMonitortInit() {
static void tscMonitorStop() { static void tscMonitorStop() {
if (taosCheckPthreadValid(monitorThread)) { if (taosCheckPthreadValid(monitorThread)) {
(void)taosThreadJoin(monitorThread, NULL); (void)taosThreadJoin(monitorThread, NULL);
(void)taosThreadClear(&monitorThread); taosThreadClear(&monitorThread);
} }
} }
@ -897,7 +898,7 @@ void monitorClose() {
taosHashCleanup(monitorSlowLogHash); taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer); taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue); taosCloseQueue(monitorQueue);
if(tsem2_destroy(&monitorSem) != 0) { if (tsem2_destroy(&monitorSem) != 0) {
tscError("failed to destroy semaphore"); tscError("failed to destroy semaphore");
} }
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
@ -921,7 +922,7 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
queueTypeStr[slowLogData->type], slowLogData->data); queueTypeStr[slowLogData->type], slowLogData->data);
if (taosWriteQitem(monitorQueue, slowLogData) == 0) { if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
if(tsem2_post(&monitorSem) != 0) { if (tsem2_post(&monitorSem) != 0) {
tscError("failed to post semaphore"); tscError("failed to post semaphore");
} }
} else { } else {

View File

@ -347,14 +347,14 @@ int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
void dmStopMonitorThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) { if (taosCheckPthreadValid(pMgmt->monitorThread)) {
(void)taosThreadJoin(pMgmt->monitorThread, NULL); (void)taosThreadJoin(pMgmt->monitorThread, NULL);
(void)taosThreadClear(&pMgmt->monitorThread); taosThreadClear(&pMgmt->monitorThread);
} }
} }
void dmStopAuditThread(SDnodeMgmt *pMgmt) { void dmStopAuditThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->auditThread)) { if (taosCheckPthreadValid(pMgmt->auditThread)) {
(void)taosThreadJoin(pMgmt->auditThread, NULL); (void)taosThreadJoin(pMgmt->auditThread, NULL);
(void)taosThreadClear(&pMgmt->auditThread); taosThreadClear(&pMgmt->auditThread);
} }
} }
@ -385,7 +385,7 @@ void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->crashReportThread)) { if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
(void)taosThreadJoin(pMgmt->crashReportThread, NULL); (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
(void)taosThreadClear(&pMgmt->crashReportThread); taosThreadClear(&pMgmt->crashReportThread);
} }
} }

View File

@ -112,7 +112,7 @@ void tEndEncode(SEncoder* pCoder) {
pCoder->size = pNode->size; pCoder->size = pNode->size;
pCoder->pos = pNode->pos; pCoder->pos = pNode->pos;
(void)tEncodeI32(pCoder, len); int32_t ret = tEncodeI32(pCoder, len);
pCoder->pos += len; pCoder->pos += len;
} }

View File

@ -391,14 +391,13 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) { int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) {
terrno = 0; terrno = 0;
(void)taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false); void *data = taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false);
return terrno; return terrno;
} }
int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t *size) { int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t *size) {
terrno = 0; terrno = 0;
void *data = taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
(void)taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
return terrno; return terrno;
} }

View File

@ -257,10 +257,11 @@ static PriorityQueueNode* pqHeapify(PriorityQueue* pq, size_t from, size_t last)
static void pqBuildHeap(PriorityQueue* pq) { static void pqBuildHeap(PriorityQueue* pq) {
if (pqContainerSize(pq) > 1) { if (pqContainerSize(pq) > 1) {
PriorityQueueNode* node;
for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) { for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) {
(void)pqHeapify(pq, i, pqContainerSize(pq)); node = pqHeapify(pq, i, pqContainerSize(pq));
} }
(void)pqHeapify(pq, 0, pqContainerSize(pq)); node = pqHeapify(pq, 0, pqContainerSize(pq));
} }
} }
@ -274,23 +275,24 @@ static PriorityQueueNode* pqReverseHeapify(PriorityQueue* pq, size_t i) {
} }
static void pqUpdate(PriorityQueue* pq, size_t i) { static void pqUpdate(PriorityQueue* pq, size_t i) {
PriorityQueueNode* node;
if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
// if value in pos i is smaller than parent, heapify down from i to the end // if value in pos i is smaller than parent, heapify down from i to the end
(void)pqHeapify(pq, i, pqContainerSize(pq)); node = pqHeapify(pq, i, pqContainerSize(pq));
} else { } else {
// if value in pos i is big than parent, heapify up from i // if value in pos i is big than parent, heapify up from i
(void)pqReverseHeapify(pq, i); node = pqReverseHeapify(pq, i);
} }
} }
static void pqRemove(PriorityQueue* pq, size_t i) { static void pqRemove(PriorityQueue* pq, size_t i) {
if (i == pqContainerSize(pq) - 1) { if (i == pqContainerSize(pq) - 1) {
(void)taosArrayPop(pq->container); void* tmp = taosArrayPop(pq->container);
return; return;
} }
taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1)); taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1));
(void)taosArrayPop(pq->container); void* tmp = taosArrayPop(pq->container);
pqUpdate(pq, i); pqUpdate(pq, i);
} }

View File

@ -305,7 +305,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr
SLRUEntry *old = shard->lru.next; SLRUEntry *old = shard->lru.next;
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
@ -529,7 +529,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
while (shard->lru.next != &shard->lru) { while (shard->lru.next != &shard->lru) {
SLRUEntry *old = shard->lru.next; SLRUEntry *old = shard->lru.next;
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
@ -574,7 +574,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
lastReference = taosLRUEntryUnref(e); lastReference = taosLRUEntryUnref(e);
if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) { if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
if (shard->usage > shard->capacity || eraseIfLastRef) { if (shard->usage > shard->capacity || eraseIfLastRef) {
(void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); SLRUEntry *tentry = taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
} else { } else {
taosLRUCacheShardLRUInsert(shard, e); taosLRUCacheShardLRUInsert(shard, e);

View File

@ -32,9 +32,9 @@ static SSkipListNode *tSkipListNewNode(uint8_t level);
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward, static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup); bool hasDup);
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList); static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList); static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList);
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList); static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList);
static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList); static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList);
SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags, SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, __compar_fn_t comparFn, uint8_t flags,
@ -103,7 +103,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
void tSkipListDestroy(SSkipList *pSkipList) { void tSkipListDestroy(SSkipList *pSkipList) {
if (pSkipList == NULL) return; if (pSkipList == NULL) return;
(void)tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
SSkipListNode *pNode = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, 0); SSkipListNode *pNode = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, 0);
@ -113,7 +113,7 @@ void tSkipListDestroy(SSkipList *pSkipList) {
tSkipListFreeNode(pTemp); tSkipListFreeNode(pTemp);
} }
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
if (pSkipList->lock != NULL) { if (pSkipList->lock != NULL) {
(void)taosThreadRwlockDestroy(pSkipList->lock); (void)taosThreadRwlockDestroy(pSkipList->lock);
taosMemoryFreeClear(pSkipList->lock); taosMemoryFreeClear(pSkipList->lock);
@ -130,12 +130,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
SSkipListNode *pNode = NULL; SSkipListNode *pNode = NULL;
(void)tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return pNode; return pNode;
} }
@ -293,11 +293,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *
return iter; return iter;
} }
(void)tSkipListRLock(pSkipList); tSkipListRLock(pSkipList);
iter->cur = getPriorNode(pSkipList, val, order, &(iter->next)); iter->cur = getPriorNode(pSkipList, val, order, &(iter->next));
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return iter; return iter;
} }
@ -307,13 +307,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
SSkipList *pSkipList = iter->pSkipList; SSkipList *pSkipList = iter->pSkipList;
(void)tSkipListRLock(pSkipList); tSkipListRLock(pSkipList);
if (iter->order == TSDB_ORDER_ASC) { if (iter->order == TSDB_ORDER_ASC) {
// no data in the skip list // no data in the skip list
if (iter->cur == pSkipList->pTail || iter->next == NULL) { if (iter->cur == pSkipList->pTail || iter->next == NULL) {
iter->cur = pSkipList->pTail; iter->cur = pSkipList->pTail;
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return false; return false;
} }
@ -329,7 +329,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
} else { } else {
if (iter->cur == pSkipList->pHead) { if (iter->cur == pSkipList->pHead) {
iter->cur = pSkipList->pHead; iter->cur = pSkipList->pHead;
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return false; return false;
} }
@ -344,7 +344,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
iter->step++; iter->step++;
} }
(void)tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead); return (iter->order == TSDB_ORDER_ASC) ? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead);
} }
@ -413,25 +413,31 @@ static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t
return iter; return iter;
} }
static FORCE_INLINE int32_t tSkipListWLock(SSkipList *pSkipList) { static FORCE_INLINE void tSkipListWLock(SSkipList *pSkipList) {
if (pSkipList->lock) { if (pSkipList->lock) {
return taosThreadRwlockWrlock(pSkipList->lock); if (taosThreadRwlockWrlock(pSkipList->lock) != 0) {
uError("failed to lock skip list");
}
} }
return 0; return;
} }
static FORCE_INLINE int32_t tSkipListRLock(SSkipList *pSkipList) { static FORCE_INLINE void tSkipListRLock(SSkipList *pSkipList) {
if (pSkipList->lock) { if (pSkipList->lock) {
return taosThreadRwlockRdlock(pSkipList->lock); if (taosThreadRwlockRdlock(pSkipList->lock) != 0) {
uError("failed to lock skip list");
}
} }
return 0; return;
} }
static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) { static FORCE_INLINE void tSkipListUnlock(SSkipList *pSkipList) {
if (pSkipList->lock) { if (pSkipList->lock) {
return taosThreadRwlockUnlock(pSkipList->lock); if (taosThreadRwlockUnlock(pSkipList->lock) != 0) {
uError("failed to unlock skip list");
}
} }
return 0; return;
} }
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {

View File

@ -59,7 +59,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
uInfo("worker:%s:%d is stopped", pool->name, worker->id); uInfo("worker:%s:%d is stopped", pool->name, worker->id);
} }
} }
@ -77,7 +77,11 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
void *msg = NULL; void *msg = NULL;
int32_t code = 0; int32_t code = 0;
(void)taosBlockSIGPIPE(); int32_t ret = taosBlockSIGPIPE();
if (ret < 0) {
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
}
setThreadName(pool->name); setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId(); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
@ -122,7 +126,13 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
(void)taosThreadMutexLock(&pool->mutex); (void)taosThreadMutexLock(&pool->mutex);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
(void)taosAddIntoQset(pool->qset, queue, ahandle); code = taosAddIntoQset(pool->qset, queue, ahandle);
if (code) {
taosCloseQueue(queue);
(void)taosThreadMutexUnlock(&pool->mutex);
terrno = code;
return NULL;
}
// spawn a thread to process queue // spawn a thread to process queue
if (pool->num < pool->max) { if (pool->num < pool->max) {
@ -191,7 +201,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
uInfo("worker:%s:%d is stopped", pool->name, worker->id); uInfo("worker:%s:%d is stopped", pool->name, worker->id);
} }
taosMemoryFree(worker); taosMemoryFree(worker);
@ -210,7 +220,11 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
void *msg = NULL; void *msg = NULL;
int32_t code = 0; int32_t code = 0;
(void)taosBlockSIGPIPE(); int32_t ret = taosBlockSIGPIPE();
if (ret < 0) {
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
}
setThreadName(pool->name); setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId(); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
@ -254,7 +268,14 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
(void)taosThreadMutexLock(&pool->mutex); (void)taosThreadMutexLock(&pool->mutex);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
(void)taosAddIntoQset(pool->qset, queue, ahandle);
code = taosAddIntoQset(pool->qset, queue, ahandle);
if (code) {
taosCloseQueue(queue);
(void)taosThreadMutexUnlock(&pool->mutex);
terrno = code;
return NULL;
}
int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers); int32_t curWorkerNum = taosArrayGetSize(pool->workers);
@ -281,7 +302,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) { if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum); uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
(void)taosArrayPop(pool->workers); void *tmp = taosArrayPop(pool->workers);
taosMemoryFree(worker); taosMemoryFree(worker);
taosCloseQueue(queue); taosCloseQueue(queue);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -342,7 +363,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id); uInfo("worker:%s:%d is stopping", pool->name, worker->id);
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
taosFreeQall(worker->qall); taosFreeQall(worker->qall);
taosCloseQset(worker->qset); taosCloseQset(worker->qset);
uInfo("worker:%s:%d is stopped", pool->name, worker->id); uInfo("worker:%s:%d is stopped", pool->name, worker->id);
@ -362,7 +383,11 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
int32_t code = 0; int32_t code = 0;
int32_t numOfMsgs = 0; int32_t numOfMsgs = 0;
(void)taosBlockSIGPIPE(); int32_t ret = taosBlockSIGPIPE();
if (ret < 0) {
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
}
setThreadName(pool->name); setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId(); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
@ -407,7 +432,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
code = taosOpenQset(&worker->qset); code = taosOpenQset(&worker->qset);
if (code) goto _OVER; if (code) goto _OVER;
(void)taosAddIntoQset(worker->qset, queue, ahandle); code = taosAddIntoQset(worker->qset, queue, ahandle);
if (code) goto _OVER;
code = taosAllocateQall(&worker->qall); code = taosAllocateQall(&worker->qall);
if (code) goto _OVER; if (code) goto _OVER;
@ -423,7 +449,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
pool->num++; pool->num++;
if (pool->num > pool->max) pool->num = pool->max; if (pool->num > pool->max) pool->num = pool->max;
} else { } else {
(void)taosAddIntoQset(worker->qset, queue, ahandle); code = taosAddIntoQset(worker->qset, queue, ahandle);
if (code) goto _OVER;
pool->nextId = (pool->nextId + 1) % pool->max; pool->nextId = (pool->nextId + 1) % pool->max;
} }
@ -551,7 +578,7 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) {
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool); static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool); static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) #define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32)
@ -629,7 +656,11 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
void *msg = NULL; void *msg = NULL;
int32_t code = 0; int32_t code = 0;
(void)taosBlockSIGPIPE(); int32_t ret = taosBlockSIGPIPE();
if (ret < 0) {
uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
}
setThreadName(pool->name); setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId(); worker->pid = taosGetSelfPthreadId();
uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
@ -648,7 +679,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
} }
(void)tQueryAutoQWorkerWaitingCheck(pool); tQueryAutoQWorkerWaitingCheck(pool);
if (qinfo.fp != NULL) { if (qinfo.fp != NULL) {
qinfo.workerId = worker->id; qinfo.workerId = worker->id;
@ -717,13 +748,13 @@ static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
return false; return false;
} }
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) { static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
while (1) { while (1) {
int64_t val64 = pPool->activeRunningN; int64_t val64 = pPool->activeRunningN;
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
while (running < pPool->num) { while (running < pPool->num) {
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) { if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
return TSDB_CODE_SUCCESS; return;
} }
} }
if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) { if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
@ -736,7 +767,7 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
// recovered from waiting // recovered from waiting
(void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
return TSDB_CODE_SUCCESS; return;
} }
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) { bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
@ -744,7 +775,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
(void)taosThreadMutexLock(&pPool->poolLock); (void)taosThreadMutexLock(&pPool->poolLock);
SListNode *pNode = listNode(pWorker); SListNode *pNode = listNode(pWorker);
(void)tdListPopNode(pPool->workers, pNode); SListNode *tNode = tdListPopNode(pPool->workers, pNode);
// reclaim some workers // reclaim some workers
if (pWorker->id >= pPool->maxInUse) { if (pWorker->id >= pPool->maxInUse) {
while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
@ -752,7 +783,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data; SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
if (pWorker && taosCheckPthreadValid(pWorker->thread)) { if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
(void)taosThreadJoin(pWorker->thread, NULL); (void)taosThreadJoin(pWorker->thread, NULL);
(void)taosThreadClear(&pWorker->thread); taosThreadClear(&pWorker->thread);
} }
taosMemoryFree(head); taosMemoryFree(head);
} }
@ -777,7 +808,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
(void)taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
return false; return false;
} }
(void)tdListPopNode(pPool->backupWorkers, pNode); SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
tdListAppendNode(pPool->workers, pNode); tdListAppendNode(pPool->workers, pNode);
(void)taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
@ -862,7 +893,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
(void)taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
if (worker && taosCheckPthreadValid(worker->thread)) { if (worker && taosCheckPthreadValid(worker->thread)) {
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
} }
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
@ -872,7 +903,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
worker = (SQueryAutoQWorker *)pNode->data; worker = (SQueryAutoQWorker *)pNode->data;
if (worker && taosCheckPthreadValid(worker->thread)) { if (worker && taosCheckPthreadValid(worker->thread)) {
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
} }
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
@ -882,7 +913,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
worker = (SQueryAutoQWorker *)pNode->data; worker = (SQueryAutoQWorker *)pNode->data;
if (worker && taosCheckPthreadValid(worker->thread)) { if (worker && taosCheckPthreadValid(worker->thread)) {
(void)taosThreadJoin(worker->thread, NULL); (void)taosThreadJoin(worker->thread, NULL);
(void)taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
} }
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
@ -913,7 +944,13 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
(void)taosThreadMutexLock(&pool->poolLock); (void)taosThreadMutexLock(&pool->poolLock);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
(void)taosAddIntoQset(pool->qset, queue, ahandle); code = taosAddIntoQset(pool->qset, queue, ahandle);
if (code) {
taosCloseQueue(queue);
queue = NULL;
(void)taosThreadMutexUnlock(&pool->poolLock);
return NULL;
}
SQueryAutoQWorker worker = {0}; SQueryAutoQWorker worker = {0};
SQueryAutoQWorker *pWorker = NULL; SQueryAutoQWorker *pWorker = NULL;