diff --git a/cmake/define.inc b/cmake/define.inc index 5d4d94ff42..ba12df6f0c 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -128,6 +128,8 @@ IF (TD_DARWIN_64) SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(DEBUG_FLAGS "-O0 -g3 -DDEBUG") SET(RELEASE_FLAGS "-Og") + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) ENDIF () IF (TD_WINDOWS) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 1d725add21..1e59396c70 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -12,4 +12,8 @@ ADD_SUBDIRECTORY(MsvcLibX) IF (TD_LINUX AND TD_MQTT) ADD_SUBDIRECTORY(MQTT-C) -ENDIF () \ No newline at end of file +ENDIF () + +IF (TD_DARWIN AND TD_MQTT) + ADD_SUBDIRECTORY(MQTT-C) +ENDIF () diff --git a/packaging/tools/install_client.sh b/packaging/tools/install_client.sh index dd116e9bfb..84ca3b5131 100755 --- a/packaging/tools/install_client.sh +++ b/packaging/tools/install_client.sh @@ -21,7 +21,7 @@ else cd ${script_dir} script_dir="$(pwd)" data_dir="/var/lib/taos" - log_dir="~/TDengineLog" + log_dir=~/TDengineLog fi log_link_dir="/usr/local/taos/log" diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 831012851a..0727fe2a1e 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -24,7 +24,7 @@ data_dir="/var/lib/taos" if [ "$osType" != "Darwin" ]; then log_dir="/var/log/taos" else - log_dir="~/TDengineLog" + log_dir=~/TDengineLog fi data_link_dir="/usr/local/taos/data" @@ -178,7 +178,9 @@ function install_bin() { function install_lib() { # Remove links ${csudo} rm -f ${lib_link_dir}/libtaos.* || : - ${csudo} rm -f ${lib64_link_dir}/libtaos.* || : + if [ "$osType" != "Darwin" ]; then + ${csudo} rm -f ${lib64_link_dir}/libtaos.* || : + fi if [ "$osType" != "Darwin" ]; then ${csudo} cp ${binary_dir}/build/lib/libtaos.so.${verNumber} ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/* @@ -190,12 +192,14 @@ function install_lib() { ${csudo} ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so fi else - ${csudo} cp ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/* - ${csudo} ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.1.dylib + ${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/* + ${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib ${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib fi - ${csudo} ldconfig + if [ "$osType" != "Darwin" ]; then + ${csudo} ldconfig + fi } function install_header() { diff --git a/src/balance/CMakeLists.txt b/src/balance/CMakeLists.txt index fdc43ea32f..a3ae74d825 100644 --- a/src/balance/CMakeLists.txt +++ b/src/balance/CMakeLists.txt @@ -11,3 +11,7 @@ AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) ADD_LIBRARY(balance ${SRC}) ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(balance ${SRC}) +ENDIF () diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 4049c0d729..660ad564a5 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -28,6 +28,28 @@ IF (TD_LINUX) ADD_SUBDIRECTORY(tests) +ELSEIF (TD_DARWIN) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) + + # set the static lib name + ADD_LIBRARY(taos_static STATIC ${SRC}) + TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m) + SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") + SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1) + + # generate dynamic library (*.dylib) + ADD_LIBRARY(taos SHARED ${SRC}) + TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m) + SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1) + + #set version of .dylib + #VERSION dylib version + #SOVERSION dylib version + #MESSAGE(STATUS "build version ${TD_VER_NUMBER}") + SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${TD_VER_NUMBER} SOVERSION 1) + + ADD_SUBDIRECTORY(tests) + ELSEIF (TD_WINDOWS) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32) @@ -49,12 +71,12 @@ ELSEIF (TD_DARWIN) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) ADD_LIBRARY(taos_static STATIC ${SRC}) - TARGET_LINK_LIBRARIES(taos_static trpc tutil pthread m) + TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m) SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") # generate dynamic library (*.dylib) ADD_LIBRARY(taos SHARED ${SRC}) - TARGET_LINK_LIBRARIES(taos trpc tutil pthread m) + TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m) SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f9eedff7eb..2f98fbc270 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -13,10 +13,12 @@ * along with this program. If not, see . */ +#ifndef __APPLE__ #define _BSD_SOURCE #define _XOPEN_SOURCE 500 #define _DEFAULT_SOURCE #define _GNU_SOURCE +#endif // __APPLE__ #include "os.h" #include "ttype.h" diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6b4d980945..36146a49b9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -295,6 +295,10 @@ void taos_close(TAOS *taos) { tscDebug("%p HB is freed", pHb); taosReleaseRef(tscObjRef, pHb->self); +#ifdef __APPLE__ + // to satisfy later tsem_destroy in taos_free_result + tsem_init(&pHb->rspSem, 0, 0); +#endif // __APPLE__ taos_free_result(pHb); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d580eccca8..d045000e24 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1942,6 +1942,10 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in } if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) { +#ifdef __APPLE__ + // to satisfy later tsem_destroy in taos_free_result + tsem_init(&pNew->rspSem, 0, 0); +#endif // __APPLE__ tscFreeSqlObj(pNew); return NULL; } @@ -2508,7 +2512,11 @@ bool tscSetSqlOwner(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; // set the sql object owner +#ifdef __APPLE__ + pthread_t threadId = (pthread_t)taosGetSelfPthreadId(); +#else // __APPLE__ uint64_t threadId = taosGetSelfPthreadId(); +#endif // __APPLE__ if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) { pRes->code = TSDB_CODE_QRY_IN_EXEC; return false; diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index 9da831c9c1..34146241a5 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -10,7 +10,17 @@ IF (TD_LINUX) ADD_LIBRARY(tcq ${SRC}) IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(tcq tutil common taos_static) - ELSE () + ELSE () + TARGET_LINK_LIBRARIES(tcq tutil common taos) + ENDIF () + ADD_SUBDIRECTORY(test) +ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(tcq ${SRC}) + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(tcq tutil common taos_static) + ELSE () TARGET_LINK_LIBRARIES(tcq tutil common taos) ENDIF () ADD_SUBDIRECTORY(test) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 699ca00a25..905ad9d920 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -23,7 +23,7 @@ IF (TD_LINUX) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) ENDIF () - + IF (TD_GRANT) TARGET_LINK_LIBRARIES(taosd grant) ENDIF () @@ -47,3 +47,42 @@ IF (TD_LINUX) COMMENT "prepare taosd environment") ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD}) ENDIF () + +IF (TD_DARWIN) + ADD_EXECUTABLE(taosd ${SRC}) + TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync) + + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(taosd taos_static) + ELSE () + TARGET_LINK_LIBRARIES(taosd taos) + ENDIF () + + IF (TD_ACCOUNT) + TARGET_LINK_LIBRARIES(taosd account) + ENDIF () + + IF (TD_GRANT) + TARGET_LINK_LIBRARIES(taosd grant) + ENDIF () + + IF (TD_MQTT) + TARGET_LINK_LIBRARIES(taosd mqtt) + ENDIF () + + # SET(PREPARE_ENV_CMD "prepare_env_cmd") + # SET(PREPARE_ENV_TARGET "prepare_env_target") + # ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} + # POST_BUILD + # COMMAND echo "make test directory" + # DEPENDS taosd + # COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/ + # COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/ + # COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/ + # COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + # COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + # COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + # COMMENT "prepare taosd environment") + # ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD}) +ENDIF () + diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index a4d7e791e6..7a218f901e 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -160,7 +160,11 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { syslog(LOG_INFO, "Shut down signal is %d", signum); syslog(LOG_INFO, "Shutting down TDengine service..."); // clean the system. +#ifdef __APPLE__ + dInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); +#else // __APPLE__ dInfo("shut down signal is %d, sender PID:%d cmdline:%s", signum, sigInfo->si_pid, taosGetCmdlineByPID(sigInfo->si_pid)); +#endif // __APPLE__ // protect the application from receive another signal struct sigaction act = {{0}}; diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index c63536818a..1880e3084c 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -236,6 +236,13 @@ static void sendTelemetryReport() { taosCloseSocket(fd); } +#ifdef __APPLE__ +static int sem_timedwait(tsem_t *sem, struct timespec *to) { + fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__); + abort(); +} +#endif // __APPLE__ + static void* telemetryThread(void* param) { struct timespec end = {0}; clock_gettime(CLOCK_REALTIME, &end); diff --git a/src/kit/shell/CMakeLists.txt b/src/kit/shell/CMakeLists.txt index 45da99e572..c4f3cc5696 100644 --- a/src/kit/shell/CMakeLists.txt +++ b/src/kit/shell/CMakeLists.txt @@ -9,14 +9,14 @@ IF (TD_LINUX) AUX_SOURCE_DIRECTORY(./src SRC) LIST(REMOVE_ITEM SRC ./src/shellWindows.c) LIST(REMOVE_ITEM SRC ./src/shellDarwin.c) - ADD_EXECUTABLE(shell ${SRC}) - + ADD_EXECUTABLE(shell ${SRC}) + IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(shell taos_static) ELSE () TARGET_LINK_LIBRARIES(shell taos) ENDIF () - + SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) ELSEIF (TD_WINDOWS) LIST(APPEND SRC ./src/shellEngine.c) @@ -27,7 +27,7 @@ ELSEIF (TD_WINDOWS) IF (TD_POWER) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME power) - ELSE () + ELSE () SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) ENDIF () ELSEIF (TD_DARWIN) @@ -37,7 +37,10 @@ ELSEIF (TD_DARWIN) LIST(APPEND SRC ./src/shellCommand.c) LIST(APPEND SRC ./src/shellImport.c) ADD_EXECUTABLE(shell ${SRC}) - TARGET_LINK_LIBRARIES(shell taos_static) + # linking with dylib + TARGET_LINK_LIBRARIES(shell taos) + # linking taos statically + # TARGET_LINK_LIBRARIES(shell taos_static) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) ENDIF () diff --git a/src/kit/shell/src/shellDarwin.c b/src/kit/shell/src/shellDarwin.c index ddf7b21bef..dbec3fdb05 100644 --- a/src/kit/shell/src/shellDarwin.c +++ b/src/kit/shell/src/shellDarwin.c @@ -21,6 +21,8 @@ #include "shellCommand.h" #include "tkey.h" +#include "tscLog.h" + #define OPT_ABORT 1 /* �Cabort */ int indicator = 1; @@ -348,6 +350,9 @@ void *shellLoopQuery(void *arg) { reset_terminal_mode(); } while (shellRunCommand(con, command) == 0); + tfree(command); + exitShell(); + pthread_cleanup_pop(1); return NULL; diff --git a/src/kit/taosdemo/CMakeLists.txt b/src/kit/taosdemo/CMakeLists.txt index 91c743939c..f74dbc2de4 100644 --- a/src/kit/taosdemo/CMakeLists.txt +++ b/src/kit/taosdemo/CMakeLists.txt @@ -7,7 +7,7 @@ INCLUDE_DIRECTORIES(inc) IF (TD_LINUX) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(taosdemo ${SRC}) - + IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(taosdemo taos_static) ELSE () @@ -17,4 +17,13 @@ ELSEIF (TD_WINDOWS) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(taosdemo ${SRC}) TARGET_LINK_LIBRARIES(taosdemo taos_static) +ELSEIF (TD_DARWIN) + AUX_SOURCE_DIRECTORY(. SRC) + ADD_EXECUTABLE(taosdemo ${SRC}) + + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(taosdemo taos_static) + ELSE () + TARGET_LINK_LIBRARIES(taosdemo taos) + ENDIF () ENDIF () diff --git a/src/kit/taosdemox/CMakeLists.txt b/src/kit/taosdemox/CMakeLists.txt index 7db6c04b28..3993cb0feb 100644 --- a/src/kit/taosdemox/CMakeLists.txt +++ b/src/kit/taosdemox/CMakeLists.txt @@ -4,22 +4,44 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/libcurl/include) -IF (TD_LINUX) +IF (TD_LINUX) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(taosdemox ${SRC}) - - #find_program(HAVE_CURL NAMES curl) - IF ((NOT TD_ARM_64) AND (NOT TD_ARM_32)) + + #find_program(HAVE_CURL NAMES curl) + IF ((NOT TD_ARM_64) AND (NOT TD_ARM_32)) ADD_DEFINITIONS(-DTD_LOWA_CURL) LINK_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/libcurl/lib) ADD_LIBRARY(curl STATIC IMPORTED) SET_PROPERTY(TARGET curl PROPERTY IMPORTED_LOCATION ${TD_COMMUNITY_DIR}/deps/libcurl/lib/libcurl.a) - TARGET_LINK_LIBRARIES(taosdemox curl) + TARGET_LINK_LIBRARIES(taosdemox curl) ENDIF () - + IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(taosdemox taos_static cJson) ELSE () TARGET_LINK_LIBRARIES(taosdemox taos cJson) ENDIF () ENDIF () + +IF (TD_DARWIN) + # missing a few dependencies, such as + # AUX_SOURCE_DIRECTORY(. SRC) + # ADD_EXECUTABLE(taosdemox ${SRC}) + # + # #find_program(HAVE_CURL NAMES curl) + # IF ((NOT TD_ARM_64) AND (NOT TD_ARM_32)) + # ADD_DEFINITIONS(-DTD_LOWA_CURL) + # LINK_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/libcurl/lib) + # ADD_LIBRARY(curl STATIC IMPORTED) + # SET_PROPERTY(TARGET curl PROPERTY IMPORTED_LOCATION ${TD_COMMUNITY_DIR}/deps/libcurl/lib/libcurl.a) + # TARGET_LINK_LIBRARIES(taosdemox curl) + # ENDIF () + # + # IF (TD_SOMODE_STATIC) + # TARGET_LINK_LIBRARIES(taosdemox taos_static cJson) + # ELSE () + # TARGET_LINK_LIBRARIES(taosdemox taos cJson) + # ENDIF () +ENDIF () + diff --git a/src/kit/taosdump/CMakeLists.txt b/src/kit/taosdump/CMakeLists.txt index dc9ac6d4a7..b50ad85c08 100644 --- a/src/kit/taosdump/CMakeLists.txt +++ b/src/kit/taosdump/CMakeLists.txt @@ -12,5 +12,15 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(taosdump taos_static) ELSE () TARGET_LINK_LIBRARIES(taosdump taos) - ENDIF () + ENDIF () +ENDIF () + +IF (TD_DARWIN) + # missing for macosx + # ADD_EXECUTABLE(taosdump ${SRC}) + # IF (TD_SOMODE_STATIC) + # TARGET_LINK_LIBRARIES(taosdump taos_static) + # ELSE () + # TARGET_LINK_LIBRARIES(taosdump taos) + # ENDIF () ENDIF () diff --git a/src/mnode/CMakeLists.txt b/src/mnode/CMakeLists.txt index 4123098694..ae4a37320c 100644 --- a/src/mnode/CMakeLists.txt +++ b/src/mnode/CMakeLists.txt @@ -9,4 +9,15 @@ IF (TD_LINUX) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(mnode ${SRC}) -ENDIF () \ No newline at end of file +ENDIF () + +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) + + INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(src SRC) + + ADD_LIBRARY(mnode ${SRC}) +ENDIF () + diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 7892918a39..5dee0e1c26 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -138,6 +138,14 @@ void mnodeDecClusterRef(SClusterObj *pCluster) { sdbDecRef(tsClusterSdb, pCluster); } +#ifdef __APPLE__ +bool taosGetSystemUid(char *uid) { + fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__); + abort(); + return false; +} +#endif // __APPLE__ + static int32_t mnodeCreateCluster() { int32_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); if (numOfClusters != 0) return TSDB_CODE_SUCCESS; diff --git a/src/os/inc/eok.h b/src/os/inc/eok.h new file mode 100644 index 0000000000..0874ca975b --- /dev/null +++ b/src/os/inc/eok.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ +#define _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __APPLE__ + +enum EPOLL_EVENTS + { + EPOLLIN = 0x001, +#define EPOLLIN EPOLLIN + EPOLLPRI = 0x002, +#define EPOLLPRI EPOLLPRI + EPOLLOUT = 0x004, +#define EPOLLOUT EPOLLOUT + EPOLLRDNORM = 0x040, +#define EPOLLRDNORM EPOLLRDNORM + EPOLLRDBAND = 0x080, +#define EPOLLRDBAND EPOLLRDBAND + EPOLLWRNORM = 0x100, +#define EPOLLWRNORM EPOLLWRNORM + EPOLLWRBAND = 0x200, +#define EPOLLWRBAND EPOLLWRBAND + EPOLLMSG = 0x400, +#define EPOLLMSG EPOLLMSG + EPOLLERR = 0x008, +#define EPOLLERR EPOLLERR + EPOLLHUP = 0x010, +#define EPOLLHUP EPOLLHUP + EPOLLRDHUP = 0x2000, +#define EPOLLRDHUP EPOLLRDHUP + EPOLLEXCLUSIVE = 1u << 28, +#define EPOLLEXCLUSIVE EPOLLEXCLUSIVE + EPOLLWAKEUP = 1u << 29, +#define EPOLLWAKEUP EPOLLWAKEUP + EPOLLONESHOT = 1u << 30, +#define EPOLLONESHOT EPOLLONESHOT + EPOLLET = 1u << 31 +#define EPOLLET EPOLLET + }; + +/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ +#define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */ +#define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */ +#define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */ + + +typedef union epoll_data +{ + void *ptr; + int fd; + uint32_t u32; + uint64_t u64; +} epoll_data_t; + +struct epoll_event +{ + uint32_t events; /* Epoll events */ + epoll_data_t data; /* User data variable */ +}; + +int epoll_create(int size); +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); +int epoll_close(int epfd); + +#endif // __APPLE__ + +#ifdef __cplusplus +} +#endif + +#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ + diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h index 1461ec6d3b..0a77ed6895 100644 --- a/src/os/inc/osDarwin.h +++ b/src/os/inc/osDarwin.h @@ -75,11 +75,11 @@ extern "C" { #define TAOS_OS_FUNC_FILE_SENDIFLE #define TAOS_OS_FUNC_SEMPHONE - #define tsem_t dispatch_semaphore_t - int tsem_init(dispatch_semaphore_t *sem, int pshared, unsigned int value); - int tsem_wait(dispatch_semaphore_t *sem); - int tsem_post(dispatch_semaphore_t *sem); - int tsem_destroy(dispatch_semaphore_t *sem); + typedef struct tsem_s *tsem_t; + int tsem_init(tsem_t *sem, int pshared, unsigned int value); + int tsem_wait(tsem_t *sem); + int tsem_post(tsem_t *sem); + int tsem_destroy(tsem_t *sem); #define TAOS_OS_FUNC_SOCKET_SETSOCKETOPT #define TAOS_OS_FUNC_STRING_STR2INT64 @@ -91,7 +91,7 @@ extern "C" { typedef int(*__compar_fn_t)(const void *, const void *); // for send function in tsocket.c -#define MSG_NOSIGNAL 0 +// #define MSG_NOSIGNAL 0 #define SO_NO_CHECK 0x1234 #define SOL_TCP 0x1234 #define TCP_KEEPIDLE 0x1234 @@ -100,6 +100,17 @@ typedef int(*__compar_fn_t)(const void *, const void *); #define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE #endif +int64_t tsosStr2int64(char *str); + +#include "eok.h" + +void taos_block_sigalrm(void); + + + + + + #ifdef __cplusplus } #endif diff --git a/src/os/inc/osDef.h b/src/os/inc/osDef.h index d718bef6da..e15c45ad14 100644 --- a/src/os/inc/osDef.h +++ b/src/os/inc/osDef.h @@ -90,11 +90,12 @@ extern "C" { #ifdef _ISOC11_SOURCE #define threadlocal _Thread_local #elif defined(__APPLE__) - #define threadlocal + #define threadlocal __thread #elif defined(__GNUC__) && !defined(threadlocal) #define threadlocal __thread #else - #define threadlocal + // #define threadlocal + #error please follow with the thread-local implementation on the target platform #endif #ifdef __cplusplus diff --git a/src/os/src/darwin/darwinEnv.c b/src/os/src/darwin/darwinEnv.c index 6adebabec0..da4b32139e 100644 --- a/src/os/src/darwin/darwinEnv.c +++ b/src/os/src/darwin/darwinEnv.c @@ -30,3 +30,4 @@ void osInit() { strcpy(tsScriptDir, "~/TDengine/cfg"); strcpy(tsOsName, "Darwin"); } + diff --git a/src/os/src/darwin/darwinSemphone.c b/src/os/src/darwin/darwinSemphone.c index 97ff543789..a9e470bd70 100644 --- a/src/os/src/darwin/darwinSemphone.c +++ b/src/os/src/darwin/darwinSemphone.c @@ -13,28 +13,269 @@ * along with this program. If not, see . */ +// fail-fast or let-it-crash philosophy +// https://en.wikipedia.org/wiki/Fail-fast +// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere +// experimentally, we follow log-and-crash here + #define _DEFAULT_SOURCE #include "os.h" -int tsem_init(dispatch_semaphore_t *sem, int pshared, unsigned int value) { - *sem = dispatch_semaphore_create(value); - if (*sem == NULL) { - return -1; - } else { - return 0; +// #define SEM_USE_PTHREAD +// #define SEM_USE_POSIX +#define SEM_USE_SEM + +#ifdef SEM_USE_SEM +#include +#include +#include +#include + +static pthread_t sem_thread; +static pthread_once_t sem_once; +static task_t sem_port; +static volatile int sem_inited = 0; +static semaphore_t sem_exit; + +static void* sem_thread_routine(void *arg) { + (void)arg; + sem_port = mach_task_self(); + kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); + if (ret != KERN_SUCCESS) { + fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", basename(__FILE__), __LINE__, __func__); + sem_inited = -1; + return NULL; + } + sem_inited = 1; + semaphore_wait(sem_exit); + return NULL; +} + +static void once_init(void) { + int r = 0; + r = pthread_create(&sem_thread, NULL, sem_thread_routine, NULL); + if (r) { + fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", basename(__FILE__), __LINE__, __func__); + return; + } + while (sem_inited==0) { + ; } } +#endif + +struct tsem_s { +#ifdef SEM_USE_PTHREAD + pthread_mutex_t lock; + pthread_cond_t cond; + volatile int64_t val; +#elif defined(SEM_USE_POSIX) + size_t id; + sem_t *sem; +#elif defined(SEM_USE_SEM) + semaphore_t sem; +#else // SEM_USE_PTHREAD + dispatch_semaphore_t sem; +#endif // SEM_USE_PTHREAD + + volatile unsigned int valid:1; +}; + +int tsem_init(tsem_t *sem, int pshared, unsigned int value) { + // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem); + if (*sem) { + fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + struct tsem_s *p = (struct tsem_s*)calloc(1, sizeof(*p)); + if (!p) { + fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + +#ifdef SEM_USE_PTHREAD + int r = pthread_mutex_init(&p->lock, NULL); + do { + if (r) break; + r = pthread_cond_init(&p->cond, NULL); + if (r) { + pthread_mutex_destroy(&p->lock); + break; + } + p->val = value; + } while (0); + if (r) { + fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } +#elif defined(SEM_USE_POSIX) + static size_t tick = 0; + do { + size_t id = atomic_add_fetch_64(&tick, 1); + if (id==SEM_VALUE_MAX) { + atomic_store_64(&tick, 0); + id = 0; + } + char name[NAME_MAX-4]; + snprintf(name, sizeof(name), "/t%ld", id); + p->sem = sem_open(name, O_CREAT|O_EXCL, pshared, value); + p->id = id; + if (p->sem!=SEM_FAILED) break; + int e = errno; + if (e==EEXIST) continue; + if (e==EINTR) continue; + fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e)); + abort(); + } while (p->sem==SEM_FAILED); +#elif defined(SEM_USE_SEM) + pthread_once(&sem_once, once_init); + if (sem_inited!=1) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", basename(__FILE__), __LINE__, __func__, sem); + errno = ENOMEM; + return -1; + } + kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, 0); + if (ret != KERN_SUCCESS) { + fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem); + // we fail-fast here, because we have less-doc about semaphore_create for the moment + abort(); + } +#else // SEM_USE_PTHREAD + p->sem = dispatch_semaphore_create(value); + if (p->sem == NULL) { + fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } +#endif // SEM_USE_PTHREAD + + p->valid = 1; + + *sem = p; -int tsem_wait(dispatch_semaphore_t *sem) { - dispatch_semaphore_wait(*sem, DISPATCH_TIME_FOREVER); return 0; } -int tsem_post(dispatch_semaphore_t *sem) { - dispatch_semaphore_signal(*sem); +int tsem_wait(tsem_t *sem) { + if (!*sem) { + fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + struct tsem_s *p = *sem; + if (!p->valid) { + fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } +#ifdef SEM_USE_PTHREAD + if (pthread_mutex_lock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + p->val -= 1; + if (p->val < 0) { + if (pthread_cond_wait(&p->cond, &p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + } + if (pthread_mutex_unlock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + return 0; +#elif defined(SEM_USE_POSIX) + return sem_wait(p->sem); +#elif defined(SEM_USE_SEM) + return semaphore_wait(p->sem); +#else // SEM_USE_PTHREAD + return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER); +#endif // SEM_USE_PTHREAD +} + +int tsem_post(tsem_t *sem) { + if (!*sem) { + fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + struct tsem_s *p = *sem; + if (!p->valid) { + fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } +#ifdef SEM_USE_PTHREAD + if (pthread_mutex_lock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + p->val += 1; + if (p->val <= 0) { + if (pthread_cond_signal(&p->cond)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + } + if (pthread_mutex_unlock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + return 0; +#elif defined(SEM_USE_POSIX) + return sem_post(p->sem); +#elif defined(SEM_USE_SEM) + return semaphore_signal(p->sem); +#else // SEM_USE_PTHREAD + return dispatch_semaphore_signal(p->sem); +#endif // SEM_USE_PTHREAD +} + +int tsem_destroy(tsem_t *sem) { + // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem); + if (!*sem) { + // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + // abort(); + return 0; + } + struct tsem_s *p = *sem; + if (!p->valid) { + // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + // abort(); + return 0; + } +#ifdef SEM_USE_PTHREAD + if (pthread_mutex_lock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + p->valid = 0; + if (pthread_cond_destroy(&p->cond)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + if (pthread_mutex_unlock(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } + if (pthread_mutex_destroy(&p->lock)) { + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + abort(); + } +#elif defined(SEM_USE_POSIX) + char name[NAME_MAX-4]; + snprintf(name, sizeof(name), "/t%ld", p->id); + int r = sem_unlink(name); + if (r) { + int e = errno; + fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e)); + abort(); + } +#elif defined(SEM_USE_SEM) + semaphore_destroy(sem_port, p->sem); +#else // SEM_USE_PTHREAD +#endif // SEM_USE_PTHREAD + + p->valid = 0; + free(p); + + *sem = NULL; return 0; } -int tsem_destroy(dispatch_semaphore_t *sem) { - return 0; -} diff --git a/src/os/src/darwin/darwinTimer.c b/src/os/src/darwin/darwinTimer.c index 5fe65fb99e..ee1becc91a 100644 --- a/src/os/src/darwin/darwinTimer.c +++ b/src/os/src/darwin/darwinTimer.c @@ -13,9 +13,82 @@ * along with this program. If not, see . */ +// fail-fast or let-it-crash philosophy +// https://en.wikipedia.org/wiki/Fail-fast +// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere +// experimentally, we follow log-and-crash here + #define _DEFAULT_SOURCE #include "os.h" +#if 1 +#include + +static void (*timer_callback)(int); +static int timer_ms = 0; +static pthread_t timer_thread; +static int timer_kq = -1; +static volatile int timer_stop = 0; + +static void* timer_routine(void *arg) { + (void)arg; + + int r = 0; + struct timespec to = {0}; + to.tv_sec = timer_ms / 1000; + to.tv_nsec = (timer_ms % 1000) * 1000000; + while (!timer_stop) { + struct kevent64_s kev[10] = {0}; + r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to); + if (r!=0) { + fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__); + abort(); + } + timer_callback(SIGALRM); // just mock + } + + return NULL; +} + +int taosInitTimer(void (*callback)(int), int ms) { + int r = 0; + timer_ms = ms; + timer_callback = callback; + + timer_kq = kqueue(); + if (timer_kq==-1) { + fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__); + // since no caller of this func checks the return value for the moment + abort(); + } + + r = pthread_create(&timer_thread, NULL, timer_routine, NULL); + if (r) { + fprintf(stderr, "==%s[%d]%s()==failed to create timer thread\n", basename(__FILE__), __LINE__, __func__); + // since no caller of this func checks the return value for the moment + abort(); + } + return 0; +} + +void taosUninitTimer() { + int r = 0; + timer_stop = 1; + r = pthread_join(timer_thread, NULL); + if (r) { + fprintf(stderr, "==%s[%d]%s()==failed to join timer thread\n", basename(__FILE__), __LINE__, __func__); + // since no caller of this func checks the return value for the moment + abort(); + } + close(timer_kq); + timer_kq = -1; +} + +void taos_block_sigalrm(void) { + // we don't know if there's any specific API for SIGALRM to deliver to specific thread + // this implementation relies on kqueue rather than SIGALRM +} +#else int taosInitTimer(void (*callback)(int), int ms) { signal(SIGALRM, callback); @@ -34,3 +107,17 @@ void taosUninitTimer() { setitimer(ITIMER_REAL, &tv, NULL); } +void taos_block_sigalrm(void) { + // since SIGALRM has been used + // consideration: any better solution? + static __thread int already_set = 0; + if (!already_set) { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGALRM); + pthread_sigmask(SIG_BLOCK, &set, NULL); + already_set = 1; + } +} +#endif + diff --git a/src/os/src/darwin/eok.c b/src/os/src/darwin/eok.c new file mode 100644 index 0000000000..f731aaf330 --- /dev/null +++ b/src/os/src/darwin/eok.c @@ -0,0 +1,893 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// fail-fast or let-it-crash philosophy +// https://en.wikipedia.org/wiki/Fail-fast +// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere +// experimentally, we follow log-and-crash here + +#include "eok.h" + +#include "os.h" + +#include + +// #define BALANCE_CHECK_WHEN_CLOSE + +#ifdef ENABLE_LOG +#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) +#define E(fmt, ...) do { \ + fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + errno, strerror(errno), \ + ##__VA_ARGS__); \ +} while (0) +#else // !ENABLE_LOG +#define D(fmt, ...) (void)fmt +#define E(fmt, ...) (void)fmt +#endif // ENABLE_LOG + +#define A(statement, fmt, ...) do { \ + if (statement) break; \ + fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + #statement, errno, strerror(errno), \ + ##__VA_ARGS__); \ + abort(); \ +} while (0) + +static int eok_dummy = 0; + +typedef struct ep_over_kq_s ep_over_kq_t; +typedef struct eok_event_s eok_event_t; + +struct ep_over_kq_s { + int kq; + + // !!! + // idx in the eoks list + // used as pseudo-file-desciptor + // must be 'closed' with epoll_close + int idx; + + ep_over_kq_t *next; + + int sv[2]; // 0 for read, 1 for write + + // all registered 'epoll events, key by fd' + int evs_count; + eok_event_t *evs_head; + eok_event_t *evs_tail; + eok_event_t *evs_free; + + // all kev changes list pending to be processed by kevent64 + // key by tuple (ident,filter), ident === fd in this case + struct kevent64_s *kchanges; + int nchanges; + int ichanges; + + // kev eventslist for kevent64 to store active events + // they remain alive among kevent64 calls + struct kevent64_s *kevslist; + int nevslist; + + pthread_mutex_t lock; + + volatile unsigned int lock_valid:1; + volatile unsigned int waiting:1; + volatile unsigned int changed:1; + volatile unsigned int wakenup:1; + volatile unsigned int stopping:1; +}; + +struct eok_event_s { + int fd; + struct epoll_event epev; + volatile unsigned int changed; // 0:registered;1:add;2:mod;3:del + + eok_event_t *next; + eok_event_t *prev; +}; + +typedef struct eoks_s eoks_t; +struct eoks_s { + pthread_mutex_t lock; + ep_over_kq_t **eoks; // note: this memory leaks when process terminates + int neoks; // we can add an extra api to let user clean + ep_over_kq_t *eoks_free_list; // currently, we just keep it simple stupid +}; + +static eoks_t eoks = { + .lock = PTHREAD_MUTEX_INITIALIZER, + .eoks = NULL, + .neoks = 0, + .eoks_free_list = NULL, +}; + +#ifdef ENABLE_LOG +static const char* op_str(int op) { + switch (op) { + case EPOLL_CTL_ADD: return "EPOLL_CTL_ADD"; + case EPOLL_CTL_MOD: return "EPOLL_CTL_MOD"; + case EPOLL_CTL_DEL: return "EPOLL_CTL_DEL"; + default: return "UNKNOWN"; + } +} + +static __thread char buf_slots[10][1024] = {0}; +static __thread int buf_slots_linelen = sizeof(buf_slots[0])/sizeof(buf_slots[0][0]); +static __thread int buf_slots_count = sizeof(buf_slots)/(sizeof(buf_slots[0])/sizeof(buf_slots[0][0])); + +static const char* events_str(uint32_t events, int slots) { + A(slots>=0 && slots on linux + // EPOLLIN = 0x001, + // #define EPOLLIN EPOLLIN + // EPOLLPRI = 0x002, + // #define EPOLLPRI EPOLLPRI + // EPOLLOUT = 0x004, + // #define EPOLLOUT EPOLLOUT + // EPOLLRDNORM = 0x040, + // #define EPOLLRDNORM EPOLLRDNORM + // EPOLLRDBAND = 0x080, + // #define EPOLLRDBAND EPOLLRDBAND + // EPOLLWRNORM = 0x100, + // #define EPOLLWRNORM EPOLLWRNORM + // EPOLLWRBAND = 0x200, + // #define EPOLLWRBAND EPOLLWRBAND + // EPOLLMSG = 0x400, + // #define EPOLLMSG EPOLLMSG + // EPOLLERR = 0x008, + // #define EPOLLERR EPOLLERR + // EPOLLHUP = 0x010, + // #define EPOLLHUP EPOLLHUP + // EPOLLRDHUP = 0x2000, + // #define EPOLLRDHUP EPOLLRDHUP + // EPOLLEXCLUSIVE = 1u << 28, + // #define EPOLLEXCLUSIVE EPOLLEXCLUSIVE + // EPOLLWAKEUP = 1u << 29, + // #define EPOLLWAKEUP EPOLLWAKEUP + // EPOLLONESHOT = 1u << 30, + // #define EPOLLONESHOT EPOLLONESHOT + // EPOLLET = 1u << 31 + // #define EPOLLET EPOLLET +#define CHK_EV(ev) \ + if (len>0 && (events & (ev))==(ev)) { \ + n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \ + p += n; \ + len -= n; \ + } + CHK_EV(EPOLLIN); + CHK_EV(EPOLLPRI); + CHK_EV(EPOLLOUT); + CHK_EV(EPOLLRDNORM); + CHK_EV(EPOLLRDBAND); + CHK_EV(EPOLLWRNORM); + CHK_EV(EPOLLWRBAND); + CHK_EV(EPOLLMSG); + CHK_EV(EPOLLERR); + CHK_EV(EPOLLHUP); + CHK_EV(EPOLLRDHUP); + CHK_EV(EPOLLEXCLUSIVE); + CHK_EV(EPOLLWAKEUP); + CHK_EV(EPOLLONESHOT); + CHK_EV(EPOLLET); +#undef CHK_EV + return buf; +} + +static const char* kev_flags_str(uint16_t flags, int slots) { + A(slots>=0 && slots + // #define EV_ADD 0x0001 /* add event to kq (implies enable) */ + // #define EV_DELETE 0x0002 /* delete event from kq */ + // #define EV_ENABLE 0x0004 /* enable event */ + // #define EV_DISABLE 0x0008 /* disable event (not reported) */ + // /* flags */ + // #define EV_ONESHOT 0x0010 /* only report one occurrence */ + // #define EV_CLEAR 0x0020 /* clear event state after reporting */ + // #define EV_RECEIPT 0x0040 /* force immediate event output */ + // /* ... with or without EV_ERROR */ + // /* ... use KEVENT_FLAG_ERROR_EVENTS */ + // /* on syscalls supporting flags */ + // #define EV_DISPATCH 0x0080 /* disable event after reporting */ + // #define EV_UDATA_SPECIFIC 0x0100 /* unique kevent per udata value */ + // #define EV_DISPATCH2 (EV_DISPATCH | EV_UDATA_SPECIFIC) + // /* ... in combination with EV_DELETE */ + // /* will defer delete until udata-specific */ + // /* event enabled. EINPROGRESS will be */ + // /* returned to indicate the deferral */ + // #define EV_VANISHED 0x0200 /* report that source has vanished */ + // /* ... only valid with EV_DISPATCH2 */ + // #define EV_SYSFLAGS 0xF000 /* reserved by system */ + // #define EV_FLAG0 0x1000 /* filter-specific flag */ + // #define EV_FLAG1 0x2000 /* filter-specific flag */ + // /* returned values */ + // #define EV_EOF 0x8000 /* EOF detected */ + // #define EV_ERROR 0x4000 /* error, data contains errno */ +#define CHK_EV(ev) \ + if (len>0 && (flags & (ev))==(ev)) { \ + n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \ + p += n; \ + len -= n; \ + } + CHK_EV(EV_ADD); + CHK_EV(EV_DELETE); + CHK_EV(EV_ENABLE); + CHK_EV(EV_DISABLE); + CHK_EV(EV_ONESHOT); + CHK_EV(EV_CLEAR); + CHK_EV(EV_RECEIPT); + CHK_EV(EV_DISPATCH); + CHK_EV(EV_UDATA_SPECIFIC); + CHK_EV(EV_DISPATCH2); + CHK_EV(EV_VANISHED); + CHK_EV(EV_SYSFLAGS); + CHK_EV(EV_FLAG0); + CHK_EV(EV_FLAG1); + CHK_EV(EV_EOF); + CHK_EV(EV_ERROR); +#undef CHK_EV + return buf; +} +#endif // ENABLE_LOG + +static ep_over_kq_t* eoks_alloc(void); +static void eoks_free(ep_over_kq_t *eok); +static ep_over_kq_t* eoks_find(int epfd); + +static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd); +static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok); +static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev); +static void eok_wakeup(ep_over_kq_t *eok); + +static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev); + +static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents); + +int epoll_create(int size) { + (void)size; + int e = 0; + ep_over_kq_t *eok = eoks_alloc(); + if (!eok) return -1; + + A(eok->kq==-1, "internal logic error"); + A(eok->lock_valid, "internal logic error"); + A(eok->idx>=0 && eok->idxnext==NULL, "internal logic error"); + A(eok->sv[0]==-1, "internal logic error"); + A(eok->sv[1]==-1, "internal logic error"); + + eok->kq = kqueue(); + if (eok->kq==-1) { + e = errno; + eoks_free(eok); + errno = e; + return -1; + } + + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, eok->sv)) { + e = errno; + eoks_free(eok); + errno = e; + return -1; + } + + struct epoll_event ev = {0}; + ev.events = EPOLLIN; + ev.data.ptr = &eok_dummy; + D("epoll_create epfd:[%d] and sv0[%d]", eok->idx, eok->sv[0]); + if (epoll_ctl(eok->idx, EPOLL_CTL_ADD, eok->sv[0], &ev)) { + e = errno; + epoll_close(eok->idx); + errno = e; + return -1; + } + + return eok->idx; +} + +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { + D("epoll_ctling epfd:[%d], op:[%s], fd:[%d], events:[%04x:%s]", + epfd, op_str(op), fd, + event ? event->events : 0, + event ? events_str(event->events, 0) : "NULL"); + int e = 0; + if (epfd<0 || epfd>=eoks.neoks) { + errno = EBADF; + return -1; + } + if (fd==-1) { + errno = EBADF; + return -1; + } + if (event && !(event->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT))) { + e = ENOTSUP; + return -1; + } + + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + return -1; + } + + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + eok_event_t* oev = eok_find_ev(eok, fd); + if (op==EPOLL_CTL_ADD && oev) { + e = EEXIST; + break; + } + if (op!=EPOLL_CTL_ADD && !oev) { + e = ENOENT; + break; + } + if (op!=EPOLL_CTL_DEL && !event) { + e = EINVAL; + break; + } + + // prepare krev/kwev + struct kevent64_s krev = {0}; + struct kevent64_s kwev = {0}; + krev.ident = -1; + kwev.ident = -1; + uint16_t flags = 0; + // prepare internal eok event + eok_event_t ev = {0}; + ev.fd = fd; + if (event) ev.epev = *event; + struct epoll_event *pev = event; + switch (op) { + case EPOLL_CTL_ADD: { + flags = EV_ADD; + ev.changed = 1; + } break; + case EPOLL_CTL_MOD: { + flags = EV_ADD; + ev.changed = 2; + } break; + case EPOLL_CTL_DEL: { + // event is ignored + // pev points to registered epoll_event + pev = &oev->epev; + flags = EV_DELETE; + ev.changed = 3; + } break; + default: { + e = ENOTSUP; + } break; + } + + if (e) break; + + // udata will be delayed to be set + if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + flags |= EV_EOF; + EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0); + } + if (pev->events & EPOLLOUT) { + EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0); + } + + // refresh registered evlist and changelist in a transaction way + if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) { + e = errno; + A(e, "internal logic error"); + break; + } + eok->changed = 1; + eok_wakeup(eok); + } while (0); + A(0==pthread_mutex_unlock(&eok->lock), ""); + + if (e) { + errno = e; + return -1; + } + + return 0; +} + +static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to); + +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { + taos_block_sigalrm(); + + int e = 0; + if (!events) { + errno = EINVAL; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); + return -1; + } + if (maxevents<=0) { + errno = EINVAL; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); + return -1; + } + + struct timespec abstime = {0}; + A(TIME_UTC==timespec_get(&abstime, TIME_UTC), "internal logic error"); + + if (timeout!=-1) { + if (timeout<0) timeout = 0; + int64_t t = abstime.tv_nsec + timeout * 1000000; + abstime.tv_sec += t / 1000000000; + abstime.tv_nsec = t % 1000000000; + } + + int r = 0; + + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); + errno = EBADF; + return -1; + } + + int cnts = 0; + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + cnts = 0; + A(eok->waiting==0, "internal logic error"); + struct kevent64_s *eventslist = eok_alloc_eventslist(eok, maxevents); + if (!eventslist) { + e = ENOMEM; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); + break; + } + memset(eventslist, 0, maxevents * sizeof(*eventslist)); + + struct timespec now = {0}; + A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error"); + struct timespec to = do_timespec_diff(&now, &abstime); + struct timespec *pto = &to; + if (timeout==-1) { + pto = NULL; + } + + // taking the changelist + struct kevent64_s *kchanges = eok->kchanges; + int nchanges = eok->nchanges; + int ichanges = eok->ichanges; + // let outside world to add changes + eok->kchanges = NULL; + eok->nchanges = 0; + eok->ichanges = 0; + + eok->changed = 0; + eok->wakenup = 0; + eok->waiting = 1; + + A(0==pthread_mutex_unlock(&eok->lock), ""); + if (ichanges>0) { + D("kevent64 epfd[%d] changing [%d] changes and waiting...", eok->idx, ichanges); + } + errno = 0; + r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto); + e = errno; + if (e) { + E("kevent64 epfd[%d] waiting done, with r[%d]", eok->idx, r); + } + A(0==pthread_mutex_lock(&eok->lock), ""); + + eok->waiting = 0; + + if (kchanges) { + if (eok->kchanges==NULL) { + // reuse + A(eok->nchanges==0 && eok->ichanges==0, "internal logic error"); + eok->kchanges = kchanges; + eok->nchanges = nchanges; + } else { + free(kchanges); + kchanges = NULL; + } + nchanges = 0; + ichanges = 0; + } + + if (r==0) { + A(timeout!=-1, "internal logic error"); + } + for (int i=0; iudata && eok->evs_head && eok->evs_tail, "internal logic error"); + eok_event_t *ev = (eok_event_t*)kev->udata; + A(kev->ident == ev->fd, "internal logic error"); + if (kev->flags & EV_ERROR) { + D("epfd[%d] error when processing change list for fd[%d], error[%s], kev_flags:[%04x:%s]", + epfd, ev->fd, strerror(kev->data), kev->flags, kev_flags_str(kev->flags, 0)); + } + switch (kev->filter) { + case EVFILT_READ: { + A((ev->epev.events & EPOLLIN), "internal logic errro"); + if (ev->epev.data.ptr==&eok_dummy) { + // it's coming from wakeup socket pair + char c = '\0'; + A(1==recv(kev->ident, &c, 1, 0), "internal logic error"); + A(0==memcmp(&c, "1", 1), "internal logic error"); + D("epfd[%d] wokenup", epfd); + continue; + } else { + if (ev->changed==3) { + D("epfd[%d] already requested to delete for fd[%d]", epfd, ev->fd); + // TODO: write a unit test for this case + // EV_DELETE? + continue; + } + // converting to epoll_event + // we shall collect all kevents for the uniq fd into one epoll_evnt + // but currently, taos never use EPOLLOUT + // just let it this way for the moment + struct epoll_event pev = {0}; + pev.data.ptr = ev->epev.data.ptr; + pev.events = EPOLLIN; + if (kev->flags & EV_EOF) { + // take all these as EOF for the moment + pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP); + } + // rounded to what user care + pev.events = pev.events & ev->epev.events; + D("epfd[%d] events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]", + epfd, + ev->fd, pev.events, events_str(pev.events, 0), + ev->epev.events, events_str(ev->epev.events, 1), + kev->flags, kev_flags_str(kev->flags, 2)); + // now we get ev and store it + events[cnts++] = pev; + } + } break; + case EVFILT_WRITE: { + A(0, "not implemented yet"); + } break; + default: { + A(0, "internal logic error"); + } break; + } + } + if (r>=0) { + // we can safely rule out delete-requested-events from the regitered evlists + // if only changelist are correctly registered + eok_event_t *p = eok->evs_head; + while (p) { + eok_event_t *next = p->next; + if (p->changed==3) { + D("epfd[%d] removing registered event for fd[%d]: [%04x:%s]", epfd, p->fd, p->epev.events, events_str(p->epev.events, 0)); + eok_free_ev(eok, p); + } + p = next; + } + } + if (cnts==0) { + // if no user-cared-events is up + // we check to see if time is up + if (timeout!=-1) { + A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error"); + to = do_timespec_diff(&now, &abstime); + if (to.tv_sec==0 && to.tv_nsec==0) break; + } + // time is not up yet, continue loop + } + } while (cnts==0); + if (cnts>0) { + D("kevent64 epfd[%d] waiting done with [%d] events", epfd, cnts); + } + A(0==pthread_mutex_unlock(&eok->lock), ""); + + if (e) { + errno = e; + E("epfd[%d] epoll_wait failed", epfd); + return -1; + } + + // tell user how many events are valid + return cnts; +} + +static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to) { + struct timespec delta; + delta.tv_sec = to->tv_sec - from->tv_sec; + delta.tv_nsec = to->tv_nsec - from->tv_nsec; + // norm and round up + while (delta.tv_nsec<0) { + delta.tv_sec -= 1; + delta.tv_nsec += 1000000000; + } + if (delta.tv_sec < 0) { + delta.tv_sec = 0; + delta.tv_nsec = 0; + } + return delta; +} + +int epoll_close(int epfd) { + D("epoll_closing epfd: [%d]", epfd); + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + return -1; + } + + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + // panic if it would be double-closed + A(eok->stopping==0, "internal logic error"); + eok->stopping = 1; + // panic if epoll_wait is pending + A(eok->waiting==0, "internal logic error"); + + if (eok->kq!=-1) { + close(eok->kq); + eok->kq = -1; + } + } while (0); + A(0==pthread_mutex_unlock(&eok->lock), ""); + eoks_free(eok); + + return 0; +} + +static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents) { + if (maxevents<=eok->nevslist) return eok->kevslist; + struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kevslist, sizeof(*p)*maxevents); + if (!p) return NULL; + eok->kevslist = p; + eok->nevslist = maxevents; + return p; +} + +static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd) { + eok_event_t *p = eok->evs_head; + while (p) { + if (p->fd == fd) return p; + p = p->next; + } + errno = ENOENT; + return NULL; +} + +static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) { + eok_event_t *p = NULL; + if (eok->evs_free) { + p = eok->evs_free; + eok->evs_free = p->next; + p->next = NULL; + A(p->prev==NULL, "internal logic error"); + } else { + p = (eok_event_t*)calloc(1, sizeof(*p)); + if (!p) return NULL; + A(p->next==NULL, "internal logic error"); + A(p->prev==NULL, "internal logic error"); + } + // dirty link + p->prev = eok->evs_tail; + if (eok->evs_tail) eok->evs_tail->next = p; + else eok->evs_head = p; + eok->evs_tail = p; + + eok->evs_count += 1; + + return p; +} + +static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) { + if (ev->prev) ev->prev->next = ev->next; + else eok->evs_head = ev->next; + if (ev->next) ev->next->prev = ev->prev; + else eok->evs_tail = ev->prev; + ev->prev = NULL; + ev->next = eok->evs_free; + eok->evs_free = ev; + + eok->evs_count -= 1; +} + +static void eok_wakeup(ep_over_kq_t *eok) { + if (!eok->waiting) return; + if (eok->wakenup) return; + eok->wakenup = 1; + A(1==send(eok->sv[1], "1", 1, 0), ""); +} + +static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev) { + if (!oev) oev = eok_calloc_ev(eok); + if (!oev) return -1; + int n = 0; + if (krev->ident==ev->fd) ++n; + if (kwev->ident==ev->fd) ++n; + A(n, "internal logic error"); + if (eok->ichanges+n>eok->nchanges) { + struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kchanges, sizeof(*p) * (eok->nchanges + 10)); + if (!p) { + if (ev->changed==1) { + // roll back + A(oev, "internal logic error"); + eok_free_ev(eok, oev); + } + errno = ENOMEM; + return -1; + } + eok->kchanges = p; + eok->nchanges += 10; + } + + // copy to registered event slot + oev->fd = ev->fd; + if (ev->changed!=3) { + // if add/mod, copy epoll_event + oev->epev = ev->epev; + } + oev->changed = ev->changed; + + // copy to changes list + n = 0; + if (krev->ident==ev->fd) { + krev->udata = (uint64_t)oev; + eok->kchanges[eok->ichanges++] = *krev; + ++n; + } + if (kwev->ident==ev->fd) { + kwev->udata = (uint64_t)oev; + eok->kchanges[eok->ichanges++] = *kwev; + ++n; + } + D("epfd[%d]: add #changes[%d] for fd[%d], and now #changes/registers [%d/%d]", eok->idx, n, ev->fd, eok->ichanges, eok->evs_count); + return 0; +} + +static ep_over_kq_t* eoks_alloc(void) { + ep_over_kq_t *eok = NULL; + + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + if (eoks.eoks_free_list) { + eok = eoks.eoks_free_list; + eoks.eoks_free_list = eok->next; + A(eoks.eoks, "internal logic error"); + A(eok->idx>=0 && eok->idxidx)==NULL, "internal logic error"); + *(eoks.eoks + eok->idx) = eok; + eok->next = NULL; + eok->stopping = 0; + break; + } + eok = (ep_over_kq_t*)calloc(1, sizeof(*eok)); + if (!eok) break; + eok->idx = -1; + ep_over_kq_t **ar = (ep_over_kq_t**)realloc(eoks.eoks, sizeof(**ar) * (eoks.neoks+1)); + if (!ar) break; + eoks.eoks = ar; + *(eoks.eoks + eoks.neoks) = eok; + eok->idx = eoks.neoks; + eok->kq = -1; + eok->sv[0] = -1; + eok->sv[1] = -1; + eoks.neoks += 1; + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); + + if (!eok) { + errno = ENOMEM; + return NULL; + } + if (eok->idx==-1) { + free(eok); + errno = ENOMEM; + return NULL; + } + if (eok->lock_valid) { + return eok; + } + + if (0==pthread_mutex_init(&eok->lock, NULL)) { + eok->lock_valid = 1; + return eok; + } + + eoks_free(eok); + errno = ENOMEM; + return NULL; +} + +static void eoks_free(ep_over_kq_t *eok) { + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + A(eok->idx>=0 && eok->idxnext==NULL, "internal logic error"); + + // leave eok->kchanges as is + A(eok->ichanges==0, "internal logic error"); + + A(eok->waiting==0, "internal logic error"); + eok_event_t *ev = eok->evs_head; + int sv_closed = 0; + while (ev) { + eok_event_t *next = ev->next; + if (ev->fd==eok->sv[0]) { + // fd is critical system resource + A(sv_closed==0, "internal logic error"); + close(eok->sv[0]); + eok->sv[0] = -1; + close(eok->sv[1]); + eok->sv[1] = -1; + eok_free_ev(eok, ev); + } else { + // user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close? + // calling close(ev->fd) here smells really bad +#ifndef BALANCE_CHECK_WHEN_CLOSE + // we just let it be and reclaim ev + eok_free_ev(eok, ev); +#else + // panic otherwise, if BALANCE_CHECK_WHEN_CLOSE is defined + A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0, + "epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?", + eok->idx, ev->fd); +#endif + } + ev = next; + } + // if (eok->evs_count==1) { + // A(eok->evs_head && eok->evs_tail && eok->evs_head==eok->evs_tail, "internal logic error"); + // A(eok->evs_head->fd==eok->sv[0] && eok->sv[0]!=-1 && eok->sv[1]!=-1, "internal logic error"); + // // fd is critical system resource + // close(eok->sv[0]); + // eok->sv[0] = -1; + // close(eok->sv[1]); + // eok->sv[1] = -1; + // eok_free_ev(eok, eok->evs_head); + // } + A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0, + "internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?"); + A(eok->sv[0]==-1 && eok->sv[1]==-1, "internal logic error"); + if (eok->kq!=-1) { + close(eok->kq); + eok->kq = -1; + } + eok->next = eoks.eoks_free_list; + eoks.eoks_free_list = eok; + *(eoks.eoks + eok->idx) = NULL; + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); +} + +static ep_over_kq_t* eoks_find(int epfd) { + ep_over_kq_t *eok = NULL; + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + if (epfd<0 || epfd>=eoks.neoks) { + break; + } + A(eoks.eoks, "internal logic error"); + eok = *(eoks.eoks + epfd); + A(eok->next==NULL, "internal logic error"); + A(eok->lock_valid, "internal logic error"); + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); + return eok; +} + diff --git a/src/os/src/detail/CMakeLists.txt b/src/os/src/detail/CMakeLists.txt index e4052a4fb3..1c5e55a522 100644 --- a/src/os/src/detail/CMakeLists.txt +++ b/src/os/src/detail/CMakeLists.txt @@ -13,3 +13,4 @@ TARGET_LINK_LIBRARIES(osdetail os) IF (TD_ARM_32 OR TD_LINUX_32) TARGET_LINK_LIBRARIES(osdetail atomic) ENDIF () + diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c index 729471247f..659c8510dc 100644 --- a/src/os/src/detail/osSocket.c +++ b/src/os/src/detail/osSocket.c @@ -73,4 +73,4 @@ const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); } -#endif \ No newline at end of file +#endif diff --git a/src/os/src/detail/osTimer.c b/src/os/src/detail/osTimer.c index 1d3ba30def..c1135ce292 100644 --- a/src/os/src/detail/osTimer.c +++ b/src/os/src/detail/osTimer.c @@ -116,6 +116,9 @@ void taosUninitTimer() { pthread_sigmask(SIG_BLOCK, &set, NULL); */ void taosMsleep(int mseconds) { +#ifdef __APPLE__ + taos_block_sigalrm(); +#endif // __APPLE__ #if 1 usleep(mseconds * 1000); #else @@ -136,6 +139,7 @@ void taosMsleep(int mseconds) { /* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ #endif + } -#endif \ No newline at end of file +#endif diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 56c3c25d7c..274f50d24a 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -8,14 +8,29 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) - + IF (TD_LINUX) ADD_LIBRARY(http ${SRC}) TARGET_LINK_LIBRARIES(http z) IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(http taos_static) - ELSE () + ELSE () + TARGET_LINK_LIBRARIES(http taos) + ENDIF () + + IF (TD_ADMIN) + TARGET_LINK_LIBRARIES(http admin) + ENDIF () +ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(http ${SRC}) + TARGET_LINK_LIBRARIES(http z) + + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(http taos_static) + ELSE () TARGET_LINK_LIBRARIES(http taos) ENDIF () diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index bc768788d8..64dc1514fa 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -31,11 +31,36 @@ static bool httpReadData(HttpContext *pContext); +#ifdef __APPLE__ +static int sv_dummy = 0; +#endif // __APPLE__ + static void httpStopThread(HttpThread* pThread) { pThread->stop = true; // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed +#ifdef __APPLE__ + int sv[2]; + sv[0] = sv[1] = -1; + int r = socketpair(PF_LOCAL, SOCK_STREAM, 0, sv); + do { + if (r) break; + struct epoll_event ev = {0}; + ev.events = EPOLLIN; + ev.data.ptr = &sv_dummy; + pThread->stop = true; + r = epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, sv[0], &ev); + if (r) break; + if (1!=send(sv[1], "1", 1, 0)) { + r = -1; + break; + } + } while (0); + if (r) { + pthread_cancel(pThread->thread); + } +#else // __APPLE__ struct epoll_event event = { .events = EPOLLIN }; eventfd_t fd = eventfd(1, 0); if (fd == -1) { @@ -46,13 +71,29 @@ static void httpStopThread(HttpThread* pThread) { httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno)); pthread_cancel(pThread->thread); } +#endif // __APPLE__ pthread_join(pThread->thread, NULL); +#ifdef __APPLE__ + if (sv[0]!=-1) { + close(sv[0]); + sv[0] = -1; + } + if (sv[1]!=-1) { + close(sv[1]); + sv[1] = -1; + } +#else // __APPLE__ if (fd != -1) { close(fd); } +#endif // __APPLE__ +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else close(pThread->pollFd); +#endif pthread_mutex_destroy(&(pThread->threadMutex)); } @@ -96,6 +137,15 @@ static void httpProcessHttpData(void *param) { if (fdNum <= 0) continue; for (int32_t i = 0; i < fdNum; ++i) { +#ifdef __APPLE__ + if (events[i].data.ptr == &sv_dummy) { + // no need to drain the recv buffer of sv[0] + // since there's only one time to send at most 1 byte to sv[0] + // btw, pThread->stop shall be already set, thus never reached here + httpDebug("if you see this line, there's internal logic error"); + continue; + } +#endif // __APPLE__ pContext = httpGetContext(events[i].data.ptr); if (pContext == NULL) { httpError("context:%p, is already released, close connect", events[i].data.ptr); diff --git a/src/plugins/monitor/CMakeLists.txt b/src/plugins/monitor/CMakeLists.txt index 90189c9d75..6b26c0447a 100644 --- a/src/plugins/monitor/CMakeLists.txt +++ b/src/plugins/monitor/CMakeLists.txt @@ -8,10 +8,20 @@ AUX_SOURCE_DIRECTORY(./src SRC) IF (TD_LINUX) ADD_LIBRARY(monitor ${SRC}) - + IF (TD_SOMODE_STATIC) TARGET_LINK_LIBRARIES(monitor taos_static) ELSE () TARGET_LINK_LIBRARIES(monitor taos) ENDIF () -ENDIF () +ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(monitor ${SRC}) + + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(monitor taos_static) + ELSE () + TARGET_LINK_LIBRARIES(monitor taos) + ENDIF () +ENDIF () diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt index 3761f70134..b6de421517 100644 --- a/src/plugins/mqtt/CMakeLists.txt +++ b/src/plugins/mqtt/CMakeLists.txt @@ -18,3 +18,15 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(mqtt taos) ENDIF () ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(mqtt ${SRC}) + TARGET_LINK_LIBRARIES(mqtt cJson mqttc) + + IF (TD_SOMODE_STATIC) + TARGET_LINK_LIBRARIES(mqtt taos_static) + ELSE () + TARGET_LINK_LIBRARIES(mqtt taos) + ENDIF () +ENDIF () + diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index e403251858..967e86de3c 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -14,3 +14,8 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(query m rt) ADD_SUBDIRECTORY(tests) ENDIF () + +IF (TD_DARWIN) + TARGET_LINK_LIBRARIES(query m) + ADD_SUBDIRECTORY(tests) +ENDIF () diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 6cdf3eff9a..ff44a2497a 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -307,7 +307,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); pthread_attr_destroy(&thattr); if (code != 0) { +#ifdef __APPLE__ + if (pThreadObj->pollFd!=-1) { + epoll_close(pThreadObj->pollFd); + pThreadObj->pollFd = -1; + } +#else // __APPLE__ taosCloseSocket(pThreadObj->pollFd); +#endif // __APPLE__ free(pThreadObj); terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); @@ -470,7 +477,10 @@ static void *taosProcessTcpData(void *param) { SFdObj *pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; - + +#ifdef __APPLE__ + taos_block_sigalrm(); +#endif // __APPLE__ while (1) { int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); if (pThreadObj->stop) { @@ -512,7 +522,14 @@ static void *taosProcessTcpData(void *param) { if (pThreadObj->stop) break; } +#ifdef __APPLE__ + if (pThreadObj->pollFd >=0) { + epoll_close(pThreadObj->pollFd); + pThreadObj->pollFd = -1; + } +#else // __APPLE__ if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); +#endif // __APPLE__ while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt index 9a4bcc353d..e923105860 100644 --- a/src/rpc/test/CMakeLists.txt +++ b/src/rpc/test/CMakeLists.txt @@ -16,3 +16,17 @@ IF (TD_LINUX) ADD_EXECUTABLE(rserver ${SERVER_SRC}) TARGET_LINK_LIBRARIES(rserver trpc) ENDIF () + +IF (TD_DARWIN) + LIST(APPEND CLIENT_SRC ./rclient.c) + ADD_EXECUTABLE(rclient ${CLIENT_SRC}) + TARGET_LINK_LIBRARIES(rclient trpc) + + LIST(APPEND SCLIENT_SRC ./rsclient.c) + ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) + TARGET_LINK_LIBRARIES(rsclient trpc) + + LIST(APPEND SERVER_SRC ./rserver.c) + ADD_EXECUTABLE(rserver ${SERVER_SRC}) + TARGET_LINK_LIBRARIES(rserver trpc) +ENDIF () diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt index 6a53380841..3721163f79 100644 --- a/src/sync/CMakeLists.txt +++ b/src/sync/CMakeLists.txt @@ -3,7 +3,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) - + IF (TD_LINUX) LIST(REMOVE_ITEM SRC src/syncArbitrator.c) ADD_LIBRARY(sync ${SRC}) @@ -16,3 +16,16 @@ IF (TD_LINUX) #ADD_SUBDIRECTORY(test) ENDIF () + +IF (TD_DARWIN) + LIST(REMOVE_ITEM SRC src/syncArbitrator.c) + ADD_LIBRARY(sync ${SRC}) + TARGET_LINK_LIBRARIES(sync tutil pthread common) + + LIST(APPEND BIN_SRC src/syncArbitrator.c) + LIST(APPEND BIN_SRC src/syncTcp.c) + ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) + TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) + + #ADD_SUBDIRECTORY(test) +ENDIF () diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index cb2379583f..7678a7d284 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include +// #include #include "os.h" #include "taoserror.h" #include "tlog.h" diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 4744666737..e15f71e905 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -233,7 +233,11 @@ static void *syncProcessTcpData(void *param) { sDebug("%p TCP epoll thread exits", pThread); +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else // __APPLE__ close(pThread->pollFd); +#endif // __APPLE__ tfree(pThread); tfree(buffer); return NULL; @@ -290,7 +294,11 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { pthread_attr_destroy(&thattr); if (ret != 0) { +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else // __APPLE__ close(pThread->pollFd); +#endif // __APPLE__ tfree(pThread); return NULL; } diff --git a/src/sync/test/CMakeLists.txt b/src/sync/test/CMakeLists.txt index 256e87580d..ab2e6c307b 100644 --- a/src/sync/test/CMakeLists.txt +++ b/src/sync/test/CMakeLists.txt @@ -13,4 +13,3 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(syncServer sync trpc common) ENDIF () - diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5067974903..f3e69d9210 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -233,7 +233,11 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; +#ifdef __APPLE__ + sem_t *readyToCommit; +#else // __APPLE__ sem_t readyToCommit; +#endif // __APPLE__ pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code @@ -616,4 +620,4 @@ int tsdbScheduleCommit(STsdbRepo *pRepo); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 696270d670..1c8a661c0a 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -166,7 +166,11 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { pRepo->imem = NULL; tsdbUnlockRepo(pRepo); tsdbUnRefMemTable(pRepo, pIMem); +#ifdef __APPLE__ + sem_post(pRepo->readyToCommit); +#else // __APPLE__ sem_post(&(pRepo->readyToCommit)); +#endif // __APPLE__ } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b34b2fa9e6..c770f0ff42 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -146,7 +146,11 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); +#else // __APPLE__ sem_wait(&(pRepo->readyToCommit)); +#endif // __APPLE__ terrno = pRepo->code; } tsdbUnRefMemTable(pRepo, pRepo->mem); @@ -643,11 +647,21 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } - code = sem_init(&(pRepo->readyToCommit), 0, 1); - if (code != 0) { +#ifdef __APPLE__ + pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1); + if (pRepo->readyToCommit==SEM_FAILED) { + code = errno; terrno = TAOS_SYSTEM_ERROR(code); goto _err; } +#else // __APPLE__ + code = sem_init(&(pRepo->readyToCommit), 0, 1); + if (code != 0) { + code = errno; + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } +#endif // __APPLE__ pRepo->repoLocked = false; @@ -693,7 +707,11 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); tfree(pRepo->rootDir); +#ifdef __APPLE__ + sem_close(pRepo->readyToCommit); +#else // __APPLE__ sem_destroy(&(pRepo->readyToCommit)); +#endif // __APPLE__ pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 07f001f68a..206a3332d5 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -207,7 +207,11 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { if (pRepo->mem == NULL) return 0; +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); +#else // __APPLE__ sem_wait(&(pRepo->readyToCommit)); +#endif // __APPLE__ ASSERT(pRepo->imem == NULL); @@ -229,8 +233,13 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; tsdbAsyncCommit(pRepo); +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); + sem_post(pRepo->readyToCommit); +#else // __APPLE__ sem_wait(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit)); +#endif // __APPLE__ if (pRepo->code != TSDB_CODE_SUCCESS) { terrno = pRepo->code; @@ -927,4 +936,4 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } return 0; -} \ No newline at end of file +} diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 48b4d76561..3606aea76b 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -5,7 +5,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) - + IF (TD_LINUX) TARGET_LINK_LIBRARIES(tutil m rt) # ADD_SUBDIRECTORY(tests) @@ -28,5 +28,6 @@ IF (TD_LINUX) ELSEIF (TD_WINDOWS) TARGET_LINK_LIBRARIES(tutil iconv regex winmm IPHLPAPI ws2_32 wepoll) ELSEIF(TD_DARWIN) + TARGET_LINK_LIBRARIES(tutil m) TARGET_LINK_LIBRARIES(tutil iconv) ENDIF() diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 49b69ea0a1..4bfd7a855b 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -354,6 +354,8 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) { return -1; } +#ifndef __APPLE__ + // all fails on macosx int32_t probes = 3; if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); @@ -374,6 +376,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) { taosCloseSocket(sockFd); return -1; } +#endif // __APPLE__ int32_t nodelay = 1; if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { diff --git a/src/vnode/CMakeLists.txt b/src/vnode/CMakeLists.txt index c953883361..3a26172411 100644 --- a/src/vnode/CMakeLists.txt +++ b/src/vnode/CMakeLists.txt @@ -13,3 +13,9 @@ IF (TD_LINUX) ADD_LIBRARY(vnode ${SRC}) TARGET_LINK_LIBRARIES(vnode tsdb tcq) ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(vnode ${SRC}) + TARGET_LINK_LIBRARIES(vnode tsdb tcq) +ENDIF () + diff --git a/src/wal/CMakeLists.txt b/src/wal/CMakeLists.txt index 681bed5425..d3cd86df50 100644 --- a/src/wal/CMakeLists.txt +++ b/src/wal/CMakeLists.txt @@ -8,4 +8,10 @@ IF (TD_LINUX) ADD_LIBRARY(twal ${SRC}) TARGET_LINK_LIBRARIES(twal tutil common) ADD_SUBDIRECTORY(test) -ENDIF () +ENDIF () + +IF (TD_DARWIN) + ADD_LIBRARY(twal ${SRC}) + TARGET_LINK_LIBRARIES(twal tutil common) + ADD_SUBDIRECTORY(test) +ENDIF () diff --git a/src/wal/test/CMakeLists.txt b/src/wal/test/CMakeLists.txt index b8338b1738..aec0602ac0 100644 --- a/src/wal/test/CMakeLists.txt +++ b/src/wal/test/CMakeLists.txt @@ -10,4 +10,12 @@ IF (TD_LINUX) ENDIF () +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND WALTEST_SRC ./waltest.c) + ADD_EXECUTABLE(waltest ${WALTEST_SRC}) + TARGET_LINK_LIBRARIES(waltest twal osdetail tutil) + +ENDIF () diff --git a/tests/comparisonTest/tdengine/CMakeLists.txt b/tests/comparisonTest/tdengine/CMakeLists.txt index aaa18592ed..a12e36ab6b 100644 --- a/tests/comparisonTest/tdengine/CMakeLists.txt +++ b/tests/comparisonTest/tdengine/CMakeLists.txt @@ -5,3 +5,8 @@ IF (TD_LINUX) add_executable(tdengineTest tdengineTest.c) target_link_libraries(tdengineTest taos_static tutil common pthread) ENDIF() + +IF (TD_DARWIN) + add_executable(tdengineTest tdengineTest.c) + target_link_libraries(tdengineTest taos_static tutil common pthread) +ENDIF() diff --git a/tests/examples/c/CMakeLists.txt b/tests/examples/c/CMakeLists.txt index 9e879d4c4c..954fe468b1 100644 --- a/tests/examples/c/CMakeLists.txt +++ b/tests/examples/c/CMakeLists.txt @@ -5,4 +5,14 @@ IF (TD_LINUX) AUX_SOURCE_DIRECTORY(. SRC) ADD_EXECUTABLE(demo apitest.c) TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) + ADD_EXECUTABLE(epoll epoll.c) + TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread ) +ENDIF () +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(. ${TD_COMMUNITY_DIR}/src/inc ${TD_COMMUNITY_DIR}/src/client/inc ${TD_COMMUNITY_DIR}/inc) + AUX_SOURCE_DIRECTORY(. SRC) + ADD_EXECUTABLE(demo demo.c) + TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) + ADD_EXECUTABLE(epoll epoll.c) + TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread ) ENDIF () diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 9cbcee45a2..45ec546803 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -86,7 +86,7 @@ void Test(TAOS *taos, char *qstr, int index) { int i = 0; for (i = 0; i < 10; ++i) { - sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", 1546300800000 + i * 1000, i, i, i, i*10000000, i*1.0, i*2.0, "hello"); + sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello"); printf("qstr: %s\n", qstr); // note: how do you wanna do if taos_query returns non-NULL diff --git a/tests/examples/c/epoll.c b/tests/examples/c/epoll.c new file mode 100644 index 0000000000..4f65ea478e --- /dev/null +++ b/tests/examples/c/epoll.c @@ -0,0 +1,320 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef __APPLE__ +#include "eok.h" +#else // __APPLE__ +#include +#endif // __APPLE__ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) +#define A(statement, fmt, ...) do { \ + if (statement) break; \ + fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + #statement, errno, strerror(errno), \ + ##__VA_ARGS__); \ + abort(); \ +} while (0) + +#define E(fmt, ...) do { \ + fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + errno, strerror(errno), \ + ##__VA_ARGS__); \ +} while (0) + +typedef struct ep_s ep_t; +struct ep_s { + int ep; + + pthread_mutex_t lock; + int sv[2]; // 0 for read, 1 for write; + pthread_t thread; + + volatile unsigned int stopping:1; + volatile unsigned int waiting:1; + volatile unsigned int wakenup:1; +}; + +static int ep_dummy = 0; + +static ep_t* ep_create(void); +static void ep_destroy(ep_t *ep); +static void* routine(void* arg); +static int open_connect(unsigned short port); +static int open_listen(unsigned short port); + +typedef struct client_s client_t; +struct client_s { + int skt; + void (*on_event)(ep_t *ep, struct epoll_event *events, client_t *client); + volatile unsigned int state; // 1: listenning; 2: connected +}; + +static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client); + +int main(int argc, char *argv[]) { + ep_t* ep = ep_create(); + A(ep, "failed"); + int skt = open_connect(6789); + if (skt!=-1) { + client_t *client = (client_t*)calloc(1, sizeof(*client)); + if (client) { + client->skt = skt; + client->on_event = echo_event; + client->state = 2; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = client; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + skt = open_listen(0); + if (skt!=-1) { + client_t *client = (client_t*)calloc(1, sizeof(*client)); + if (client) { + client->skt = skt; + client->on_event = echo_event; + client->state = 1; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = client; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + // char c = '\0'; + // while ((c=getchar())!=EOF) { + // switch (c) { + // case 'q': break; + // default: continue; + // } + // } + // getchar(); + char *line = NULL; + size_t linecap = 0; + ssize_t linelen; + while ((linelen = getline(&line, &linecap, stdin)) > 0) { + line[strlen(line)-1] = '\0'; + if (0==strcmp(line, "exit")) break; + if (0==strcmp(line, "quit")) break; + if (line==strstr(line, "close")) { + int fd = 0; + sscanf(line, "close %d", &fd); + if (fd<=2) { + fprintf(stderr, "fd [%d] invalid\n", fd); + continue; + } + A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), ""); + continue; + } + if (strlen(line)==0) continue; + fprintf(stderr, "unknown cmd:[%s]\n", line); + } + ep_destroy(ep); + D(""); + return 0; +} + +ep_t* ep_create(void) { + ep_t *ep = (ep_t*)calloc(1, sizeof(*ep)); + A(ep, "out of memory"); + A(-1!=(ep->ep = epoll_create(1)), ""); + ep->sv[0] = -1; + ep->sv[1] = -1; + A(0==socketpair(AF_LOCAL, SOCK_STREAM, 0, ep->sv), ""); + A(0==pthread_mutex_init(&ep->lock, NULL), ""); + A(0==pthread_mutex_lock(&ep->lock), ""); + struct epoll_event ev = {0}; + ev.events = EPOLLIN; + ev.data.ptr = &ep_dummy; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, ep->sv[0], &ev), ""); + A(0==pthread_create(&ep->thread, NULL, routine, ep), ""); + A(0==pthread_mutex_unlock(&ep->lock), ""); + return ep; +} + +static void ep_destroy(ep_t *ep) { + A(ep, "invalid argument"); + ep->stopping = 1; + A(1==send(ep->sv[1], "1", 1, 0), ""); + A(0==pthread_join(ep->thread, NULL), ""); + A(0==pthread_mutex_destroy(&ep->lock), ""); + A(0==close(ep->sv[0]), ""); + A(0==close(ep->sv[1]), ""); + A(0==close(ep->ep), ""); + free(ep); +} + +static void* routine(void* arg) { + A(arg, "invalid argument"); + ep_t *ep = (ep_t*)arg; + + while (!ep->stopping) { + struct epoll_event evs[10]; + memset(evs, 0, sizeof(evs)); + + A(0==pthread_mutex_lock(&ep->lock), ""); + A(ep->waiting==0, "internal logic error"); + ep->waiting = 1; + A(0==pthread_mutex_unlock(&ep->lock), ""); + + int r = epoll_wait(ep->ep, evs, sizeof(evs)/sizeof(evs[0]), -1); + A(r>0, "indefinite epoll_wait shall not timeout:[%d]", r); + + A(0==pthread_mutex_lock(&ep->lock), ""); + A(ep->waiting==1, "internal logic error"); + ep->waiting = 0; + A(0==pthread_mutex_unlock(&ep->lock), ""); + + for (int i=0; idata.ptr == &ep_dummy) { + char c = '\0'; + A(1==recv(ep->sv[0], &c, 1, 0), "internal logic error"); + A(0==pthread_mutex_lock(&ep->lock), ""); + ep->wakenup = 0; + A(0==pthread_mutex_unlock(&ep->lock), ""); + D("........"); + continue; + } + A(ev->data.ptr, "internal logic error"); + client_t *client = (client_t*)ev->data.ptr; + client->on_event(ep, ev, client); + continue; + } + } + return NULL; +} + +static int open_listen(unsigned short port) { + int r = 0; + int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (skt==-1) { + E("socket() failed"); + return -1; + } + do { + struct sockaddr_in si = {0}; + si.sin_family = AF_INET; + si.sin_addr.s_addr = inet_addr("127.0.0.1"); + si.sin_port = htons(port); + r = bind(skt, (struct sockaddr*)&si, sizeof(si)); + if (r) { + E("bind(%u) failed", port); + break; + } + r = listen(skt, 100); + if (r) { + E("listen() failed"); + break; + } + memset(&si, 0, sizeof(si)); + socklen_t len = sizeof(si); + r = getsockname(skt, (struct sockaddr *)&si, &len); + if (r) { + E("getsockname() failed"); + } + A(len==sizeof(si), "internal logic error"); + D("listenning at: %d", ntohs(si.sin_port)); + return skt; + } while (0); + close(skt); + return -1; +} + +static int open_connect(unsigned short port) { + int r = 0; + int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (skt==-1) { + E("socket() failed"); + return -1; + } + do { + struct sockaddr_in si = {0}; + si.sin_family = AF_INET; + si.sin_addr.s_addr = inet_addr("127.0.0.1"); + si.sin_port = htons(port); + r = connect(skt, (struct sockaddr*)&si, sizeof(si)); + if (r) { + E("connect(%u) failed", port); + break; + } + memset(&si, 0, sizeof(si)); + socklen_t len = sizeof(si); + r = getsockname(skt, (struct sockaddr *)&si, &len); + if (r) { + E("getsockname() failed"); + } + A(len==sizeof(si), "internal logic error"); + D("connected: %d", ntohs(si.sin_port)); + return skt; + } while (0); + close(skt); + return -1; +} + +static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client) { + if (ev->events & EPOLLIN) { + if (client->state==1) { + struct sockaddr_in si = {0}; + socklen_t silen = sizeof(si); + int skt = accept(client->skt, (struct sockaddr*)&si, &silen); + if (skt!=-1) { + client_t *server = (client_t*)calloc(1, sizeof(*server)); + if (server) { + server->skt = skt; + server->on_event = echo_event; + server->state = 2; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = server; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + } + if (client->state==2) { + char buf[4]; + int n = recv(client->skt, buf, sizeof(buf)-1, 0); + A(n>=0 && nevents, buf); + } + } + if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + A(0==pthread_mutex_lock(&ep->lock), ""); + A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), ""); + A(0==pthread_mutex_unlock(&ep->lock), ""); + close(client->skt); + client->skt = -1; + client->on_event = NULL; + free(client); + } +} +