From e226898e1751d35726a97209bc0b726536d5177e Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Wed, 11 Sep 2024 20:25:04 +0800 Subject: [PATCH] semphore --- include/os/osSemaphore.h | 30 ++++---- source/client/src/clientImpl.c | 4 - source/client/src/clientMonitor.c | 8 +- source/common/src/cos.c | 2 +- source/common/src/tmisce.c | 1 - source/libs/catalog/src/catalog.c | 2 +- source/libs/catalog/src/ctgCache.c | 2 +- source/libs/executor/src/exchangeoperator.c | 1 - source/libs/transport/src/transCli.c | 8 +- source/os/src/osSemaphore.c | 85 ++++++++++++++++----- source/util/src/tsched.c | 8 +- 11 files changed, 93 insertions(+), 58 deletions(-) diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 5b46706790..d893f42740 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -27,11 +27,11 @@ extern "C" { // typedef struct tsem_s *tsem_t; typedef dispatch_semaphore_t tsem_t; -int tsem_init(tsem_t *sem, int pshared, unsigned int value); -int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t milis); -int tsem_post(tsem_t *sem); -int tsem_destroy(tsem_t *sem); +int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value); +int32_t tsem_wait(tsem_t *sem); +int32_t tsem_timewait(tsem_t *sim, int64_t milis); +int32_t tsem_post(tsem_t *sem); +int32_t tsem_destroy(tsem_t *sem); #define tsem2_t tsem_t #define tsem2_init tsem_init @@ -45,11 +45,11 @@ int tsem_destroy(tsem_t *sem); #define tsem_t HANDLE -int tsem_init(tsem_t *sem, int pshared, unsigned int value); -int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t milis); -int tsem_post(tsem_t *sem); -int tsem_destroy(tsem_t *sem); +int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value); +int32_t tsem_wait(tsem_t *sem); +int32_t tsem_timewait(tsem_t *sim, int64_t milis); +int32_t tsem_post(tsem_t *sem); +int32_t tsem_destroy(tsem_t *sem); #define tsem2_t tsem_t #define tsem2_init tsem_init @@ -61,11 +61,11 @@ int tsem_destroy(tsem_t *sem); #else #define tsem_t sem_t -#define tsem_init sem_init -int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t milis); -#define tsem_post sem_post -#define tsem_destroy sem_destroy +int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value); +int32_t tsem_wait(tsem_t *sem); +int32_t tsem_timewait(tsem_t *sim, int64_t milis); +int32_t tsem_post(tsem_t *sem); +int32_t tsem_destroy(tsem_t *sem); typedef struct tsem2_t { TdThreadMutex mutex; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d77b8dcbb7..000ec60718 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2871,7 +2871,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s } int32_t code = tsem_init(¶m->sem, 0, 0); if (TSDB_CODE_SUCCESS != code) { - terrno = code; taosMemoryFree(param); return NULL; } @@ -2879,7 +2878,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source); code = tsem_wait(¶m->sem); if (TSDB_CODE_SUCCESS != code) { - terrno = code; taosMemoryFree(param); return NULL; } @@ -2910,7 +2908,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, } int32_t code = tsem_init(¶m->sem, 0, 0); if (TSDB_CODE_SUCCESS != code) { - terrno = code; taosMemoryFree(param); return NULL; } @@ -2918,7 +2915,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid); code = tsem_wait(¶m->sem); if (TSDB_CODE_SUCCESS != code) { - terrno = code; taosMemoryFree(param); return NULL; } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index b890b61668..90893ba106 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -34,19 +34,19 @@ static void processFileInTheEnd(TdFilePtr pFile, char* path) { return; } if (taosFtruncateFile(pFile, 0) != 0) { - tscError("failed to truncate file:%s, errno:%d", path, terrno); + tscError("failed to truncate file:%s, terrno:%d", path, terrno); return; } if (taosUnLockFile(pFile) != 0) { - tscError("failed to unlock file:%s, errno:%d", path, terrno); + tscError("failed to unlock file:%s, terrno:%d", path, terrno); return; } if (taosCloseFile(&(pFile)) != 0) { - tscError("failed to close file:%s, errno:%d", path, errno); + tscError("failed to close file:%s, terrno:%d", path, terrno); return; } if (taosRemoveFile(path) != 0) { - tscError("failed to remove file:%s, errno:%d", path, errno); + tscError("failed to remove file:%s, terrno:%d", path, terrno); return; } } diff --git a/source/common/src/cos.c b/source/common/src/cos.c index d3b3f1f87d..82b1fad2ce 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1412,7 +1412,7 @@ static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileNa TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - uError("[s3] open file error, errno:%d, fileName:%s", terrno, fileName); + uError("[s3] open file error, terrno:%d, fileName:%s", terrno, fileName); TAOS_RETURN(terrno); } diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index f3f4b29617..e40db7b4cf 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -197,7 +197,6 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t code = taosGetAppName(tmp, NULL); if (code != 0) { - code = TAOS_SYSTEM_ERROR(errno); TAOS_CHECK_GOTO(code, NULL, _exit); } TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 0b0cb6dc91..11e1b1221b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -849,7 +849,7 @@ int32_t catalogInit(SCatalogCfg* cfg) { } if (tsem_init(&gCtgMgmt.queue.reqSem, 0, 0)) { - qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + qError("tsem_init failed, terror:%s", tstrerror(terrno)); CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 01201a2480..fe353b0eda 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3239,7 +3239,7 @@ void *ctgUpdateThreadFunc(void *param) { while (true) { if (tsem_wait(&gCtgMgmt.queue.reqSem)) { - qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + qError("ctg tsem_wait failed, error:%s", tstrerror(terrno)); } if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 457aa8ff2b..5352ea37ae 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -565,7 +565,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pSourceDataInfo->status = EX_SOURCE_DATA_READY; code = tsem_post(&pExchangeInfo->ready); if (code != TSDB_CODE_SUCCESS) { - code = TAOS_SYSTEM_ERROR(code); qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); return code; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 78320c450c..ea30f0c112 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -3087,7 +3087,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra code = tsem_init(sem, 0, 0); if (code != 0) { taosMemoryFree(sem); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1); + TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); } if (pReq->info.traceId.msgId == 0) TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -3250,9 +3250,8 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, } code = tsem2_timewait(pSyncMsg->pSem, timeoutMs); - if (code < 0) { - pRsp->code = TSDB_CODE_TIMEOUT_ERROR; - code = TSDB_CODE_TIMEOUT_ERROR; + if (code != 0) { + pRsp->code = code; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); pSyncMsg->pRsp->pCont = NULL; @@ -3260,7 +3259,6 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, epsetAssign(pEpSet, &pSyncMsg->epSet); *epUpdated = 1; } - code = 0; } _RETURN: (void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index e959174f11..4496ba011f 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -48,7 +48,10 @@ int32_t taosGetPId() { return GetCurrentProcessId(); } int32_t taosGetAppName(char* name, int32_t* len) { char filepath[1024] = {0}; - GetModuleFileName(NULL, filepath, MAX_PATH); + if (GetModuleFileName(NULL, filepath, MAX_PATH) == 0) { + terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + return terrno; + } char* sub = strrchr(filepath, '.'); if (sub != NULL) { *sub = '\0'; @@ -70,7 +73,12 @@ int32_t taosGetAppName(char* name, int32_t* len) { } int32_t tsem_wait(tsem_t* sem) { - return WaitForSingleObject(*sem, INFINITE); + DWORD ret = WaitForSingleObject(*sem, INFINITE); + if(ret == WAIT_OBJECT_0) { + return 0; + } else { + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + } } int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) { @@ -78,61 +86,65 @@ int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) { if (result == WAIT_OBJECT_0) { return 0; // Semaphore acquired } else if (result == WAIT_TIMEOUT) { - return -1; // Timeout reached + return TSDB_CODE_TIMEOUT_ERROR; // Timeout reached } else { - return result; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } } // Inter-process sharing is not currently supported. The pshared parameter is invalid. -int tsem_init(tsem_t* sem, int pshared, unsigned int value) { +int32_t tsem_init(tsem_t* sem, int pshared, unsigned int value) { *sem = CreateSemaphore(NULL, value, LONG_MAX, NULL); - return (*sem != NULL) ? 0 : -1; + return (*sem != NULL) ? 0 : TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } -int tsem_post(tsem_t* sem) { +int32_t tsem_post(tsem_t* sem) { if (ReleaseSemaphore(*sem, 1, NULL)) return 0; - return -1; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } -int tsem_destroy(tsem_t* sem) { +int32_t tsem_destroy(tsem_t* sem) { if (CloseHandle(*sem)) return 0; - return -1; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } #elif defined(_TD_DARWIN_64) #include -int tsem_init(tsem_t *psem, int flags, unsigned int count) { +int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) { *psem = dispatch_semaphore_create(count); - if (*psem == NULL) return -1; + if (*psem == NULL) return TAO_SYTAOS_SYSTEM_ERROR(errno); return 0; } -int tsem_destroy(tsem_t *psem) { - if (psem == NULL || *psem == NULL) return -1; +int32_t tsem_destroy(tsem_t *psem) { + // if (psem == NULL || *psem == NULL) return -1; // dispatch_release(*psem); // *psem = NULL; return 0; } -int tsem_post(tsem_t *psem) { +int32_t tsem_post(tsem_t *psem) { if (psem == NULL || *psem == NULL) return -1; - dispatch_semaphore_signal(*psem); + (void)dispatch_semaphore_signal(*psem); return 0; } -int tsem_wait(tsem_t *psem) { +int32_t tsem_wait(tsem_t *psem) { if (psem == NULL || *psem == NULL) return -1; dispatch_semaphore_wait(*psem, DISPATCH_TIME_FOREVER); return 0; } -int tsem_timewait(tsem_t *psem, int64_t milis) { +int32_t tsem_timewait(tsem_t *psem, int64_t milis) { if (psem == NULL || *psem == NULL) return -1; dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC)); - return dispatch_semaphore_wait(*psem, time); + if(dispatch_semaphore_wait(*psem, time) == 0) { + return 0; + } else { + return TSDB_CODE_TIMEOUT_ERROR; + } } bool taosCheckPthreadValid(TdThread thread) { return thread != 0; } @@ -216,6 +228,14 @@ int32_t taosGetAppName(char* name, int32_t* len) { return 0; } +int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) { + if(sem_init(psem, flags, count) == 0) { + return 0; + } else { + return TAOS_SYSTEM_ERROR(errno); + } +} + int32_t tsem_timewait(tsem_t* sem, int64_t ms) { int ret = 0; @@ -233,13 +253,16 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) { while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) { continue; } + if(ETIMEDOUT == errno) { + return TSDB_CODE_TIMEOUT_ERROR; + } if (-1 == ret) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } - return ret; + return 0; } int32_t tsem_wait(tsem_t* sem) { @@ -285,6 +308,22 @@ int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) { return 0; } +int32_t tsem_post(tsem_t* psem) { + if (sem_post(psem) == 0) { + return 0; + } else { + return TAOS_SYSTEM_ERROR(errno); + } +} + +int32_t tsem_destroy(tsem_t *sem) { + if (sem_destroy(sem) == 0) { + return 0; + } else { + return TAOS_SYSTEM_ERROR(errno); + } +} + int tsem2_post(tsem2_t *sem) { int32_t code = taosThreadMutexLock(&sem->mutex); if (code) { @@ -364,7 +403,11 @@ int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) { ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts); if (ret != 0) { (void)taosThreadMutexUnlock(&sem->mutex); - return ret; + if (errno == ETIMEDOUT) { + return TSDB_CODE_TIMEOUT_ERROR; + } else { + return TAOS_SYSTEM_ERROR(errno); + } } } } diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 34c74660fc..1686b41038 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -150,7 +150,7 @@ void *taosProcessSchedQueue(void *scheduler) { while (1) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) { - uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); + uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(terrno)); } if (atomic_load_8(&pSched->stop)) { break; @@ -169,7 +169,7 @@ void *taosProcessSchedQueue(void *scheduler) { } if ((ret = tsem_post(&pSched->emptySem)) != 0) { - uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno)); + uFatal("post %s emptySem failed(%s)", pSched->label, strerror(terrno)); } if (msg.fp) @@ -197,7 +197,7 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { } if ((ret = tsem_wait(&pSched->emptySem)) != 0) { - uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); + uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(terrno)); } if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) { @@ -212,7 +212,7 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { } if ((ret = tsem_post(&pSched->fullSem)) != 0) { - uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); + uFatal("post %s fullSem failed(%s)", pSched->label, strerror(terrno)); } return ret; }