This commit is contained in:
xsren 2024-09-11 20:25:04 +08:00
parent 8b7b5b7c6d
commit e226898e17
11 changed files with 93 additions and 58 deletions

View File

@ -27,11 +27,11 @@ extern "C" {
// typedef struct tsem_s *tsem_t;
typedef dispatch_semaphore_t tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value);
int32_t tsem_wait(tsem_t *sem);
int32_t tsem_timewait(tsem_t *sim, int64_t milis);
int32_t tsem_post(tsem_t *sem);
int32_t tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
@ -45,11 +45,11 @@ int tsem_destroy(tsem_t *sem);
#define tsem_t HANDLE
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value);
int32_t tsem_wait(tsem_t *sem);
int32_t tsem_timewait(tsem_t *sim, int64_t milis);
int32_t tsem_post(tsem_t *sem);
int32_t tsem_destroy(tsem_t *sem);
#define tsem2_t tsem_t
#define tsem2_init tsem_init
@ -61,11 +61,11 @@ int tsem_destroy(tsem_t *sem);
#else
#define tsem_t sem_t
#define tsem_init sem_init
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
int32_t tsem_init(tsem_t *sem, int pshared, unsigned int value);
int32_t tsem_wait(tsem_t *sem);
int32_t tsem_timewait(tsem_t *sim, int64_t milis);
int32_t tsem_post(tsem_t *sem);
int32_t tsem_destroy(tsem_t *sem);
typedef struct tsem2_t {
TdThreadMutex mutex;

View File

@ -2871,7 +2871,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
}
int32_t code = tsem_init(&param->sem, 0, 0);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
@ -2879,7 +2878,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
code = tsem_wait(&param->sem);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
@ -2910,7 +2908,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
}
int32_t code = tsem_init(&param->sem, 0, 0);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
@ -2918,7 +2915,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
code = tsem_wait(&param->sem);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}

View File

@ -34,19 +34,19 @@ static void processFileInTheEnd(TdFilePtr pFile, char* path) {
return;
}
if (taosFtruncateFile(pFile, 0) != 0) {
tscError("failed to truncate file:%s, errno:%d", path, terrno);
tscError("failed to truncate file:%s, terrno:%d", path, terrno);
return;
}
if (taosUnLockFile(pFile) != 0) {
tscError("failed to unlock file:%s, errno:%d", path, terrno);
tscError("failed to unlock file:%s, terrno:%d", path, terrno);
return;
}
if (taosCloseFile(&(pFile)) != 0) {
tscError("failed to close file:%s, errno:%d", path, errno);
tscError("failed to close file:%s, terrno:%d", path, terrno);
return;
}
if (taosRemoveFile(path) != 0) {
tscError("failed to remove file:%s, errno:%d", path, errno);
tscError("failed to remove file:%s, terrno:%d", path, terrno);
return;
}
}

View File

@ -1412,7 +1412,7 @@ static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileNa
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[s3] open file error, errno:%d, fileName:%s", terrno, fileName);
uError("[s3] open file error, terrno:%d, fileName:%s", terrno, fileName);
TAOS_RETURN(terrno);
}

View File

@ -197,7 +197,6 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t
code = taosGetAppName(tmp, NULL);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
TAOS_CHECK_GOTO(code, NULL, _exit);
}
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit);

View File

@ -849,7 +849,7 @@ int32_t catalogInit(SCatalogCfg* cfg) {
}
if (tsem_init(&gCtgMgmt.queue.reqSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
qError("tsem_init failed, terror:%s", tstrerror(terrno));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}

View File

@ -3239,7 +3239,7 @@ void *ctgUpdateThreadFunc(void *param) {
while (true) {
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
qError("ctg tsem_wait failed, error:%s", tstrerror(terrno));
}
if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {

View File

@ -565,7 +565,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
code = tsem_post(&pExchangeInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
code = TAOS_SYSTEM_ERROR(code);
qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
return code;
}

View File

@ -3087,7 +3087,7 @@ int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
code = tsem_init(sem, 0, 0);
if (code != 0) {
taosMemoryFree(sem);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _RETURN1);
TAOS_CHECK_GOTO(terrno, NULL, _RETURN1);
}
if (pReq->info.traceId.msgId == 0) TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
@ -3250,9 +3250,8 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq,
}
code = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (code < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
code = TSDB_CODE_TIMEOUT_ERROR;
if (code != 0) {
pRsp->code = code;
} else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
pSyncMsg->pRsp->pCont = NULL;
@ -3260,7 +3259,6 @@ int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq,
epsetAssign(pEpSet, &pSyncMsg->epSet);
*epUpdated = 1;
}
code = 0;
}
_RETURN:
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);

View File

@ -48,7 +48,10 @@ int32_t taosGetPId() { return GetCurrentProcessId(); }
int32_t taosGetAppName(char* name, int32_t* len) {
char filepath[1024] = {0};
GetModuleFileName(NULL, filepath, MAX_PATH);
if (GetModuleFileName(NULL, filepath, MAX_PATH) == 0) {
terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
return terrno;
}
char* sub = strrchr(filepath, '.');
if (sub != NULL) {
*sub = '\0';
@ -70,7 +73,12 @@ int32_t taosGetAppName(char* name, int32_t* len) {
}
int32_t tsem_wait(tsem_t* sem) {
return WaitForSingleObject(*sem, INFINITE);
DWORD ret = WaitForSingleObject(*sem, INFINITE);
if(ret == WAIT_OBJECT_0) {
return 0;
} else {
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
}
}
int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) {
@ -78,61 +86,65 @@ int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) {
if (result == WAIT_OBJECT_0) {
return 0; // Semaphore acquired
} else if (result == WAIT_TIMEOUT) {
return -1; // Timeout reached
return TSDB_CODE_TIMEOUT_ERROR; // Timeout reached
} else {
return result;
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
}
}
// Inter-process sharing is not currently supported. The pshared parameter is invalid.
int tsem_init(tsem_t* sem, int pshared, unsigned int value) {
int32_t tsem_init(tsem_t* sem, int pshared, unsigned int value) {
*sem = CreateSemaphore(NULL, value, LONG_MAX, NULL);
return (*sem != NULL) ? 0 : -1;
return (*sem != NULL) ? 0 : TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
}
int tsem_post(tsem_t* sem) {
int32_t tsem_post(tsem_t* sem) {
if (ReleaseSemaphore(*sem, 1, NULL)) return 0;
return -1;
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
}
int tsem_destroy(tsem_t* sem) {
int32_t tsem_destroy(tsem_t* sem) {
if (CloseHandle(*sem)) return 0;
return -1;
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
}
#elif defined(_TD_DARWIN_64)
#include <libproc.h>
int tsem_init(tsem_t *psem, int flags, unsigned int count) {
int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
*psem = dispatch_semaphore_create(count);
if (*psem == NULL) return -1;
if (*psem == NULL) return TAO_SYTAOS_SYSTEM_ERROR(errno);
return 0;
}
int tsem_destroy(tsem_t *psem) {
if (psem == NULL || *psem == NULL) return -1;
int32_t tsem_destroy(tsem_t *psem) {
// if (psem == NULL || *psem == NULL) return -1;
// dispatch_release(*psem);
// *psem = NULL;
return 0;
}
int tsem_post(tsem_t *psem) {
int32_t tsem_post(tsem_t *psem) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_semaphore_signal(*psem);
(void)dispatch_semaphore_signal(*psem);
return 0;
}
int tsem_wait(tsem_t *psem) {
int32_t tsem_wait(tsem_t *psem) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_semaphore_wait(*psem, DISPATCH_TIME_FOREVER);
return 0;
}
int tsem_timewait(tsem_t *psem, int64_t milis) {
int32_t tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
return dispatch_semaphore_wait(*psem, time);
if(dispatch_semaphore_wait(*psem, time) == 0) {
return 0;
} else {
return TSDB_CODE_TIMEOUT_ERROR;
}
}
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
@ -216,6 +228,14 @@ int32_t taosGetAppName(char* name, int32_t* len) {
return 0;
}
int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
if(sem_init(psem, flags, count) == 0) {
return 0;
} else {
return TAOS_SYSTEM_ERROR(errno);
}
}
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
int ret = 0;
@ -233,13 +253,16 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) {
continue;
}
if(ETIMEDOUT == errno) {
return TSDB_CODE_TIMEOUT_ERROR;
}
if (-1 == ret) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
return ret;
return 0;
}
int32_t tsem_wait(tsem_t* sem) {
@ -285,6 +308,22 @@ int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) {
return 0;
}
int32_t tsem_post(tsem_t* psem) {
if (sem_post(psem) == 0) {
return 0;
} else {
return TAOS_SYSTEM_ERROR(errno);
}
}
int32_t tsem_destroy(tsem_t *sem) {
if (sem_destroy(sem) == 0) {
return 0;
} else {
return TAOS_SYSTEM_ERROR(errno);
}
}
int tsem2_post(tsem2_t *sem) {
int32_t code = taosThreadMutexLock(&sem->mutex);
if (code) {
@ -364,7 +403,11 @@ int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) {
ret = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts);
if (ret != 0) {
(void)taosThreadMutexUnlock(&sem->mutex);
return ret;
if (errno == ETIMEDOUT) {
return TSDB_CODE_TIMEOUT_ERROR;
} else {
return TAOS_SYSTEM_ERROR(errno);
}
}
}
}

View File

@ -150,7 +150,7 @@ void *taosProcessSchedQueue(void *scheduler) {
while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(terrno));
}
if (atomic_load_8(&pSched->stop)) {
break;
@ -169,7 +169,7 @@ void *taosProcessSchedQueue(void *scheduler) {
}
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(terrno));
}
if (msg.fp)
@ -197,7 +197,7 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
}
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(terrno));
}
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
@ -212,7 +212,7 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
}
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(terrno));
}
return ret;
}