diff --git a/include/os/osThread.h b/include/os/osThread.h index aa0dc066c6..3fdafe2c34 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -22,6 +22,15 @@ extern "C" { #endif +#if defined(WINDOWS) && !defined(__USE_PTHREAD) +#include +#define __USE_WIN_THREAD +// https://learn.microsoft.com/en-us/windows/win32/winprog/using-the-windows-headers +// #ifndef _WIN32_WINNT +// #define _WIN32_WINNT 0x0600 +// #endif +#endif + #if !defined(WINDOWS) && !defined(_ALPINE) #ifndef __USE_XOPEN2K #define TD_USE_SPINLOCK_AS_MUTEX @@ -29,6 +38,22 @@ typedef pthread_mutex_t pthread_spinlock_t; #endif #endif +#ifdef __USE_WIN_THREAD +typedef pthread_t TdThread; // pthread api +typedef pthread_spinlock_t TdThreadSpinlock; // pthread api +typedef CRITICAL_SECTION TdThreadMutex; // windows api +typedef HANDLE TdThreadMutexAttr; // windows api +typedef struct { + SRWLOCK lock; + int8_t excl; +} TdThreadRwlock; // pthread api +typedef pthread_attr_t TdThreadAttr; // pthread api +typedef pthread_once_t TdThreadOnce; // pthread api +typedef HANDLE TdThreadRwlockAttr; // windows api +typedef CONDITION_VARIABLE TdThreadCond; // windows api +typedef HANDLE TdThreadCondAttr; // windows api +typedef pthread_key_t TdThreadKey; // pthread api +#else typedef pthread_t TdThread; typedef pthread_spinlock_t TdThreadSpinlock; typedef pthread_mutex_t TdThreadMutex; @@ -40,11 +65,14 @@ typedef pthread_rwlockattr_t TdThreadRwlockAttr; typedef pthread_cond_t TdThreadCond; typedef pthread_condattr_t TdThreadCondAttr; typedef pthread_key_t TdThreadKey; +#endif #define taosThreadCleanupPush pthread_cleanup_push #define taosThreadCleanupPop pthread_cleanup_pop -#ifdef WINDOWS +#if defined(WINDOWS) && !defined(__USE_PTHREAD) +#define TD_PTHREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER_FORBID +#elif defined(WINDOWS) #define TD_PTHREAD_MUTEX_INITIALIZER (TdThreadMutex)(-1) #else #define TD_PTHREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index f7b580345b..0baffcbad6 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -456,10 +456,20 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) return -1; } #ifdef WINDOWS - size_t pos = _lseeki64(pFile->fd, 0, SEEK_CUR); - _lseeki64(pFile->fd, offset, SEEK_SET); - int64_t ret = _read(pFile->fd, buf, count); - _lseeki64(pFile->fd, pos, SEEK_SET); + int64_t ret = 0; + + OVERLAPPED ol = {0}; + ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20); + ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL); + + HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd); + SetLastError(0); + bool result = ReadFile(handle, buf, count, &ret, &ol); + + if (!result && GetLastError() != ERROR_HANDLE_EOF) { + errno = GetLastError(); + return -1; + } #else int64_t ret = pread(pFile->fd, buf, count, offset); #endif @@ -523,10 +533,19 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t return 0; } #ifdef WINDOWS - size_t pos = _lseeki64(pFile->fd, 0, SEEK_CUR); - _lseeki64(pFile->fd, offset, SEEK_SET); - int64_t ret = _write(pFile->fd, buf, count); - _lseeki64(pFile->fd, pos, SEEK_SET); + int64_t ret = 0; + + OVERLAPPED ol = {0}; + pl.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20); + pl.Offset = (uint32_t)(offset & 0xFFFFFFFFLL); + + HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd); + SetLastError(0); + bool result = WriteFile(handle, buf, count, &ret, &ol); + if (!result) { + errno = GetLastError(); + return -1; + } #else int64_t ret = pwrite(pFile->fd, buf, count, offset); #endif diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 4c4e22bdd9..2080677140 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -84,34 +84,97 @@ int32_t taosThreadAttrSetStackSize(TdThreadAttr *attr, size_t stacksize) { int32_t taosThreadCancel(TdThread thread) { return pthread_cancel(thread); } -int32_t taosThreadCondDestroy(TdThreadCond *cond) { return pthread_cond_destroy(cond); } +int32_t taosThreadCondDestroy(TdThreadCond *cond) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_cond_destroy(cond); +#endif +} -int32_t taosThreadCondInit(TdThreadCond *cond, const TdThreadCondAttr *attr) { return pthread_cond_init(cond, attr); } +int32_t taosThreadCondInit(TdThreadCond *cond, const TdThreadCondAttr *attr) { +#ifdef __USE_WIN_THREAD + InitializeConditionVariable(cond); + return 0; +#else + return pthread_cond_init(cond, attr); +#endif +} -int32_t taosThreadCondSignal(TdThreadCond *cond) { return pthread_cond_signal(cond); } +int32_t taosThreadCondSignal(TdThreadCond *cond) { +#ifdef __USE_WIN_THREAD + WakeConditionVariable(cond); + return 0; +#else + return pthread_cond_signal(cond); +#endif +} -int32_t taosThreadCondBroadcast(TdThreadCond *cond) { return pthread_cond_broadcast(cond); } +int32_t taosThreadCondBroadcast(TdThreadCond *cond) { +#ifdef __USE_WIN_THREAD + WakeAllConditionVariable(cond); +#else + return pthread_cond_broadcast(cond); +#endif +} int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { +#ifdef __USE_WIN_THREAD + if (!SleepConditionVariableCS(cond, mutex, INFINITE)) { + return EINVAL; + } + return 0; +#else THREAD_PTR_CHECK(mutex) return pthread_cond_wait(cond, mutex); +#endif } int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) { +#ifdef __USE_WIN_THREAD + if (!abstime) return EINVAL; + if (SleepConditionVariableCS(cond, mutex, (DWORD)(abstime->tv_sec * 1e3 + abstime->tv_nsec / 1e6))) return 0; + if (GetLastError() == ERROR_TIMEOUT) { + return ETIMEDOUT; + } + return EINVAL; +#else THREAD_PTR_CHECK(mutex) return pthread_cond_timedwait(cond, mutex, abstime); +#endif } -int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr) { return pthread_condattr_destroy(attr); } +int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_condattr_destroy(attr); +#endif +} int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared) { +#ifdef __USE_WIN_THREAD + if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE; + return 0; +#else return pthread_condattr_getpshared(attr, pshared); +#endif } -int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { return pthread_condattr_init(attr); } +int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_condattr_init(attr); +#endif +} int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) { +#ifdef __USE_WIN_THREAD + return 0; +#else return pthread_condattr_setpshared(attr, pshared); +#endif } int32_t taosThreadDetach(TdThread thread) { return pthread_detach(thread); } @@ -142,17 +205,39 @@ int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(threa // } int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { +#ifdef __USE_WIN_THREAD + DeleteCriticalSection(mutex); + return 0; +#else THREAD_PTR_CHECK(mutex) return pthread_mutex_destroy(mutex); +#endif } int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) { +#ifdef __USE_WIN_THREAD + /** + * Windows Server 2003 and Windows XP: In low memory situations, InitializeCriticalSection can raise a + * STATUS_NO_MEMORY exception. Starting with Windows Vista, this exception was eliminated and + * InitializeCriticalSection always succeeds, even in low memory situations. + */ + InitializeCriticalSection(mutex); + return 0; +#else return pthread_mutex_init(mutex, attr); +#endif } int32_t taosThreadMutexLock(TdThreadMutex *mutex) { +#ifdef __USE_WIN_THREAD + EnterCriticalSection(mutex); + return 0; +#else THREAD_PTR_CHECK(mutex) - return pthread_mutex_lock(mutex); + int result = pthread_mutex_lock(mutex); + assert(result == 0); + return result; +#endif } // int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) { @@ -160,19 +245,42 @@ int32_t taosThreadMutexLock(TdThreadMutex *mutex) { // } int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { - THREAD_PTR_CHECK(mutex) - return pthread_mutex_trylock(mutex); +#ifdef __USE_WIN_THREAD + if (TryEnterCriticalSection(mutex)) return 0; + return EBUSY; +#else + THREAD_PTR_CHECK(mutex) + return pthread_mutex_trylock(mutex); +#endif } int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { +#ifdef __USE_WIN_THREAD + LeaveCriticalSection(mutex); + return 0; +#else THREAD_PTR_CHECK(mutex) - return pthread_mutex_unlock(mutex); + int result = pthread_mutex_unlock(mutex); + assert(result == 0); + return result; +#endif } -int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { return pthread_mutexattr_destroy(attr); } +int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_mutexattr_destroy(attr); +#endif +} int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr *attr, int32_t *pshared) { +#ifdef __USE_WIN_THREAD + if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE; + return 0; +#else return pthread_mutexattr_getpshared(attr, pshared); +#endif } // int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust) { @@ -180,13 +288,28 @@ int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr *attr, int32_t *ps // } int32_t taosThreadMutexAttrGetType(const TdThreadMutexAttr *attr, int32_t *kind) { +#ifdef __USE_WIN_THREAD + if (kind) *kind = PTHREAD_MUTEX_NORMAL; + return 0; +#else return pthread_mutexattr_gettype(attr, kind); +#endif } -int32_t taosThreadMutexAttrInit(TdThreadMutexAttr *attr) { return pthread_mutexattr_init(attr); } +int32_t taosThreadMutexAttrInit(TdThreadMutexAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_mutexattr_init(attr); +#endif +} int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr *attr, int32_t pshared) { +#ifdef __USE_WIN_THREAD + return 0; +#else return pthread_mutexattr_setpshared(attr, pshared); +#endif } // int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust) { @@ -194,20 +317,46 @@ int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr *attr, int32_t pshared) // } int32_t taosThreadMutexAttrSetType(TdThreadMutexAttr *attr, int32_t kind) { +#ifdef __USE_WIN_THREAD + return 0; +#else return pthread_mutexattr_settype(attr, kind); +#endif } int32_t taosThreadOnce(TdThreadOnce *onceControl, void (*initRoutine)(void)) { return pthread_once(onceControl, initRoutine); } -int32_t taosThreadRwlockDestroy(TdThreadRwlock *rwlock) { return pthread_rwlock_destroy(rwlock); } - -int32_t taosThreadRwlockInit(TdThreadRwlock *rwlock, const TdThreadRwlockAttr *attr) { - return pthread_rwlock_init(rwlock, attr); +int32_t taosThreadRwlockDestroy(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + /* SRWLock does not need explicit destruction so long as there are no waiting threads + * See: https://docs.microsoft.com/windows/win32/api/synchapi/nf-synchapi-initializesrwlock#remarks + */ + return 0; +#else + return pthread_rwlock_destroy(rwlock); +#endif } -int32_t taosThreadRwlockRdlock(TdThreadRwlock *rwlock) { return pthread_rwlock_rdlock(rwlock); } +int32_t taosThreadRwlockInit(TdThreadRwlock *rwlock, const TdThreadRwlockAttr *attr) { +#ifdef __USE_WIN_THREAD + memset(rwlock, 0, sizeof(*rwlock)); + InitializeSRWLock(&rwlock->lock); + return 0; +#else + return pthread_rwlock_init(rwlock, attr); +#endif +} + +int32_t taosThreadRwlockRdlock(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + AcquireSRWLockShared(&rwlock->lock); + return 0; +#else + return pthread_rwlock_rdlock(rwlock); +#endif +} // int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime) { // return pthread_rwlock_timedrdlock(rwlock, abstime); @@ -217,24 +366,79 @@ int32_t taosThreadRwlockRdlock(TdThreadRwlock *rwlock) { return pthread_rwlock_r // return pthread_rwlock_timedwrlock(rwlock, abstime); // } -int32_t taosThreadRwlockTryRdlock(TdThreadRwlock *rwlock) { return pthread_rwlock_tryrdlock(rwlock); } - -int32_t taosThreadRwlockTryWrlock(TdThreadRwlock *rwlock) { return pthread_rwlock_trywrlock(rwlock); } - -int32_t taosThreadRwlockUnlock(TdThreadRwlock *rwlock) { return pthread_rwlock_unlock(rwlock); } - -int32_t taosThreadRwlockWrlock(TdThreadRwlock *rwlock) { return pthread_rwlock_wrlock(rwlock); } - -int32_t taosThreadRwlockAttrDestroy(TdThreadRwlockAttr *attr) { return pthread_rwlockattr_destroy(attr); } - -int32_t taosThreadRwlockAttrGetPshared(const TdThreadRwlockAttr *attr, int32_t *pshared) { - return pthread_rwlockattr_getpshared(attr, pshared); +int32_t taosThreadRwlockTryRdlock(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + if (!TryAcquireSRWLockShared(&rwlock->lock)) return EBUSY; + return 0; +#else + return pthread_rwlock_tryrdlock(rwlock); +#endif } -int32_t taosThreadRwlockAttrInit(TdThreadRwlockAttr *attr) { return pthread_rwlockattr_init(attr); } +int32_t taosThreadRwlockTryWrlock(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + if (!TryAcquireSRWLockExclusive(&rwlock->lock)) return EBUSY; + atomic_store_8(&rwlock->excl, 1); + return 0; +#else + return pthread_rwlock_trywrlock(rwlock); +#endif +} + +int32_t taosThreadRwlockUnlock(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + if (1 == atomic_val_compare_exchange_8(&rwlock->excl, 1, 0)) { + ReleaseSRWLockExclusive(&rwlock->lock); + } else { + ReleaseSRWLockShared(&rwlock->lock); + } + return 0; +#else + return pthread_rwlock_unlock(rwlock); +#endif +} + +int32_t taosThreadRwlockWrlock(TdThreadRwlock *rwlock) { +#ifdef __USE_WIN_THREAD + AcquireSRWLockExclusive(&rwlock->lock); + atomic_store_8(&rwlock->excl, 1); + return 0; +#else + return pthread_rwlock_wrlock(rwlock); +#endif +} + +int32_t taosThreadRwlockAttrDestroy(TdThreadRwlockAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_rwlockattr_destroy(attr); +#endif +} + +int32_t taosThreadRwlockAttrGetPshared(const TdThreadRwlockAttr *attr, int32_t *pshared) { +#ifdef __USE_WIN_THREAD + if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE; + return 0; +#else + return pthread_rwlockattr_getpshared(attr, pshared); +#endif +} + +int32_t taosThreadRwlockAttrInit(TdThreadRwlockAttr *attr) { +#ifdef __USE_WIN_THREAD + return 0; +#else + return pthread_rwlockattr_init(attr); +#endif +} int32_t taosThreadRwlockAttrSetPshared(TdThreadRwlockAttr *attr, int32_t pshared) { +#ifdef __USE_WIN_THREAD + return 0; +#else return pthread_rwlockattr_setpshared(attr, pshared); +#endif } TdThread taosThreadSelf(void) { return pthread_self(); } @@ -297,4 +501,4 @@ int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { void taosThreadTestCancel(void) { return pthread_testcancel(); } -void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } \ No newline at end of file +void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 28d9b412a0..392ac5d8b2 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -25,7 +25,7 @@ static TdThread cacheRefreshWorker = {0}; static TdThreadOnce cacheThreadInit = PTHREAD_ONCE_INIT; -static TdThreadMutex guard = TD_PTHREAD_MUTEX_INITIALIZER; +static TdThreadMutex guard; static SArray *pCacheArrayList = NULL; static bool stopRefreshWorker = false; static bool refreshWorkerNormalStopped = false; @@ -155,6 +155,8 @@ static void *taosCacheTimedRefresh(void *handle); static void doInitRefreshThread(void) { pCacheArrayList = taosArrayInit(4, POINTER_BYTES); + taosThreadMutexInit(&guard, NULL); + TdThreadAttr thattr; taosThreadAttrInit(&thattr); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);