From a649f55195c37c45c7f66a4d42a3268b7603f48b Mon Sep 17 00:00:00 2001 From: freemine Date: Fri, 15 Jan 2021 22:34:48 +0800 Subject: [PATCH] eok --- src/CMakeLists.txt | 2 +- src/client/CMakeLists.txt | 4 +- src/client/src/tscSQLParser.c | 2 + src/client/src/tscUtil.c | 4 + src/kit/shell/src/shellDarwin.c | 2 + src/os/inc/eok.h | 74 ++++ src/os/inc/osDarwin.h | 11 +- src/os/src/darwin/darwinEnv.c | 3 + src/os/src/darwin/eok.c | 580 ++++++++++++++++++++++++++++++ src/os/src/detail/osSocket.c | 3 +- src/plugins/http/src/httpServer.c | 4 + src/rpc/src/rpcTcp.c | 22 ++ src/sync/src/syncTcp.c | 14 + src/tsdb/inc/tsdbMain.h | 6 +- src/tsdb/src/tsdbCommit.c | 4 + src/tsdb/src/tsdbMain.c | 16 + src/tsdb/src/tsdbMemTable.c | 11 +- src/util/src/tsocket.c | 4 +- tests/examples/c/CMakeLists.txt | 5 + 19 files changed, 763 insertions(+), 8 deletions(-) create mode 100644 src/os/inc/eok.h create mode 100644 src/os/src/darwin/eok.c diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 931a0a132e..f7304ae72f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,5 +19,5 @@ ADD_SUBDIRECTORY(wal) ADD_SUBDIRECTORY(cq) ADD_SUBDIRECTORY(dnode) #ADD_SUBDIRECTORY(connector/odbc) -ADD_SUBDIRECTORY(connector/jdbc) +#ADD_SUBDIRECTORY(connector/jdbc) diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index daf7c5e534..3fd29b474e 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -49,12 +49,12 @@ ELSEIF (TD_DARWIN) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) ADD_LIBRARY(taos_static STATIC ${SRC}) - TARGET_LINK_LIBRARIES(taos_static trpc tutil pthread m) + TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m) SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") # generate dynamic library (*.dylib) ADD_LIBRARY(taos SHARED ${SRC}) - TARGET_LINK_LIBRARIES(taos trpc tutil pthread m) + TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m) SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index a08482f570..4a24f2bfdb 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -13,10 +13,12 @@ * along with this program. If not, see . */ +#ifndef __APPLE__ #define _BSD_SOURCE #define _XOPEN_SOURCE 500 #define _DEFAULT_SOURCE #define _GNU_SOURCE +#endif // __APPLE__ #include "os.h" #include "ttype.h" diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b44ebb3c98..d6151c1a88 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2486,7 +2486,11 @@ bool tscSetSqlOwner(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; // set the sql object owner +#ifdef __APPLE__ + pthread_t threadId = (pthread_t)taosGetSelfPthreadId(); +#else uint64_t threadId = taosGetSelfPthreadId(); +#endif if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) { pRes->code = TSDB_CODE_QRY_IN_EXEC; return false; diff --git a/src/kit/shell/src/shellDarwin.c b/src/kit/shell/src/shellDarwin.c index ddf7b21bef..d6aed4401c 100644 --- a/src/kit/shell/src/shellDarwin.c +++ b/src/kit/shell/src/shellDarwin.c @@ -21,6 +21,8 @@ #include "shellCommand.h" #include "tkey.h" +#include "tscLog.h" + #define OPT_ABORT 1 /* �Cabort */ int indicator = 1; diff --git a/src/os/inc/eok.h b/src/os/inc/eok.h new file mode 100644 index 0000000000..8892e50c35 --- /dev/null +++ b/src/os/inc/eok.h @@ -0,0 +1,74 @@ +#ifndef _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ +#define _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +enum EPOLL_EVENTS + { + EPOLLIN = 0x001, +#define EPOLLIN EPOLLIN + EPOLLPRI = 0x002, +#define EPOLLPRI EPOLLPRI + EPOLLOUT = 0x004, +#define EPOLLOUT EPOLLOUT + EPOLLRDNORM = 0x040, +#define EPOLLRDNORM EPOLLRDNORM + EPOLLRDBAND = 0x080, +#define EPOLLRDBAND EPOLLRDBAND + EPOLLWRNORM = 0x100, +#define EPOLLWRNORM EPOLLWRNORM + EPOLLWRBAND = 0x200, +#define EPOLLWRBAND EPOLLWRBAND + EPOLLMSG = 0x400, +#define EPOLLMSG EPOLLMSG + EPOLLERR = 0x008, +#define EPOLLERR EPOLLERR + EPOLLHUP = 0x010, +#define EPOLLHUP EPOLLHUP + EPOLLRDHUP = 0x2000, +#define EPOLLRDHUP EPOLLRDHUP + EPOLLEXCLUSIVE = 1u << 28, +#define EPOLLEXCLUSIVE EPOLLEXCLUSIVE + EPOLLWAKEUP = 1u << 29, +#define EPOLLWAKEUP EPOLLWAKEUP + EPOLLONESHOT = 1u << 30, +#define EPOLLONESHOT EPOLLONESHOT + EPOLLET = 1u << 31 +#define EPOLLET EPOLLET + }; + +/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ +#define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */ +#define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */ +#define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */ + + +typedef union epoll_data +{ + void *ptr; + int fd; + uint32_t u32; + uint64_t u64; +} epoll_data_t; + +struct epoll_event +{ + uint32_t events; /* Epoll events */ + epoll_data_t data; /* User data variable */ +}; + +int epoll_create(int size); +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); +int epoll_close(int epfd); + +#ifdef __cplusplus +} +#endif + +#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ + diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h index 1461ec6d3b..52bb661f89 100644 --- a/src/os/inc/osDarwin.h +++ b/src/os/inc/osDarwin.h @@ -91,7 +91,7 @@ extern "C" { typedef int(*__compar_fn_t)(const void *, const void *); // for send function in tsocket.c -#define MSG_NOSIGNAL 0 +// #define MSG_NOSIGNAL 0 #define SO_NO_CHECK 0x1234 #define SOL_TCP 0x1234 #define TCP_KEEPIDLE 0x1234 @@ -100,6 +100,15 @@ typedef int(*__compar_fn_t)(const void *, const void *); #define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE #endif +int64_t tsosStr2int64(char *str); + +#include "eok.h" + + + + + + #ifdef __cplusplus } #endif diff --git a/src/os/src/darwin/darwinEnv.c b/src/os/src/darwin/darwinEnv.c index 6adebabec0..28388f24d2 100644 --- a/src/os/src/darwin/darwinEnv.c +++ b/src/os/src/darwin/darwinEnv.c @@ -17,6 +17,8 @@ #include "os.h" #include "tglobal.h" +#include + void osInit() { if (configDir[0] == 0) { strcpy(configDir, "~/TDengine/cfg"); @@ -30,3 +32,4 @@ void osInit() { strcpy(tsScriptDir, "~/TDengine/cfg"); strcpy(tsOsName, "Darwin"); } + diff --git a/src/os/src/darwin/eok.c b/src/os/src/darwin/eok.c new file mode 100644 index 0000000000..ea67223c25 --- /dev/null +++ b/src/os/src/darwin/eok.c @@ -0,0 +1,580 @@ +#include "eok.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) +#define A(statement, fmt, ...) do { \ + if (statement) break; \ + fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + #statement, errno, strerror(errno), \ + ##__VA_ARGS__); \ + abort(); \ +} while (0) + +#define E(fmt, ...) do { \ + fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \ + basename(__FILE__), __LINE__, __func__, \ + errno, strerror(errno), \ + ##__VA_ARGS__); \ +} while (0) + +static int eok_dummy = 0; + +typedef struct ep_over_kq_s ep_over_kq_t; +typedef struct eok_event_s eok_event_t; + +struct ep_over_kq_s { + int kq; + int idx; + ep_over_kq_t *next; + + int sv[2]; // 0 for read, 1 for write + + eok_event_t *evs_head; + eok_event_t *evs_tail; + eok_event_t *evs_free; + + struct kevent64_s *kchanges; + int nchanges; + int ichanges; + + struct kevent64_s *kevslist; + int nevslist; + + pthread_mutex_t lock; + + volatile unsigned int lock_valid:1; + volatile unsigned int waiting:1; + volatile unsigned int changed:1; + volatile unsigned int wakenup:1; + volatile unsigned int stopping:1; +}; + +struct eok_event_s { + int fd; + struct epoll_event epev; + volatile unsigned int changed; // 0:registered;1:add;2:mod;3:del + + eok_event_t *next; + eok_event_t *prev; +}; + +typedef struct eoks_s eoks_t; +struct eoks_s { + pthread_mutex_t lock; + ep_over_kq_t *eoks; + int neoks; + ep_over_kq_t *eoks_free; +}; +static eoks_t eoks = { + .lock = PTHREAD_MUTEX_INITIALIZER, + .eoks = NULL, + .neoks = 0, + .eoks_free = NULL, +}; + +static ep_over_kq_t* eoks_alloc(void); +static void eoks_free(ep_over_kq_t *eok); +static ep_over_kq_t* eoks_find(int epfd); + +static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd); +static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok); +static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev); +static void eok_wakeup(ep_over_kq_t *eok); + +static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev); + +static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents); + +int epoll_create(int size) { + (void)size; + int e = 0; + ep_over_kq_t *eok = eoks_alloc(); + if (!eok) { + errno = ENOMEM; + return -1; + } + + A(eok->kq==-1, "internal logic error"); + A(eok->lock_valid, "internal logic error"); + A(eok->idx>=0 && eok->idxnext==NULL, "internal logic error"); + A(eok->sv[0]==-1, "internal logic error"); + A(eok->sv[1]==-1, "internal logic error"); + + eok->kq = kqueue(); + if (eok->kq==-1) { + e = errno; + eoks_free(eok); + errno = e; + return -1; + } + + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, eok->sv)) { + e = errno; + eoks_free(eok); + errno = e; + return -1; + } + + struct epoll_event ev = {0}; + ev.events = EPOLLIN; + ev.data.ptr = &eok_dummy; + if (epoll_ctl(eok->idx, EPOLL_CTL_ADD, eok->sv[0], &ev)) { + A(0, "internal logic error"); + epoll_close(eok->idx); + return -1; + } + + return eok->idx; +} + +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { + int e = 0; + if (epfd<0 || epfd>=eoks.neoks) { + errno = EBADF; + return -1; + } + if (fd==-1) { + errno = EBADF; + return -1; + } + if (event && !(event->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT))) { + e = ENOTSUP; + return -1; + } + + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + return -1; + } + + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + eok_event_t* oev = eok_find_ev(eok, fd); + if (op==EPOLL_CTL_ADD && oev) { + e = EEXIST; + break; + } + if (op!=EPOLL_CTL_ADD && !oev) { + e = ENOENT; + break; + } + + struct kevent64_s krev = {0}; + struct kevent64_s kwev = {0}; + krev.ident = -1; + kwev.ident = -1; + uint16_t flags = 0; + eok_event_t ev = {0}; + ev.fd = fd; + if (event) ev.epev = *event; + struct epoll_event *pev = event; + switch (op) { + case EPOLL_CTL_ADD: { + if (!event) { + e = EINVAL; + break; + } + flags = EV_ADD; + ev.changed = 1; + } break; + case EPOLL_CTL_MOD: { + if (!event) { + e = EINVAL; + break; + } + flags = EV_ADD; + ev.changed = 2; + } break; + case EPOLL_CTL_DEL: { + // event is ignored + pev = &oev->epev; + flags = EV_DELETE; + ev.changed = 3; + } break; + default: { + e = ENOTSUP; + } break; + } + + if (e) break; + + if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + flags |= EV_EOF; + EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0); + D("...."); + } + if (pev->events & EPOLLOUT) { + EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0); + D("...."); + } + + if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) { + e = errno; + A(e, "internal logic error"); + break; + } + eok->changed = 1; + eok_wakeup(eok); + } while (0); + A(0==pthread_mutex_unlock(&eok->lock), ""); + + if (e) { + errno = e; + return -1; + } + + return 0; +} + +static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to); + +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { + int e = 0; + if (epfd<0 || epfd>=eoks.neoks) { + errno = EBADF; + return -1; + } + if (!events) { + errno = EINVAL; + return -1; + } + if (maxevents<=0) { + errno = EINVAL; + return -1; + } + + int r = 0; + + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + return -1; + } + + struct timespec abstime = {0}; + A(TIME_UTC==timespec_get(&abstime, TIME_UTC), "internal logic error"); + + if (timeout!=-1) { + if (timeout<0) timeout = 0; + int64_t t = abstime.tv_nsec + timeout * 1000000; + abstime.tv_sec += t / 1000000000; + abstime.tv_nsec %= 1000000000; + } + + int cnts = 0; + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + cnts = 0; + A(eok->waiting==0, "internal logic error"); + struct kevent64_s *eventslist = eok_alloc_eventslist(eok, maxevents); + if (!eventslist) { + e = ENOMEM; + break; + } + memset(eventslist, 0, maxevents * sizeof(*eventslist)); + + struct timespec now = {0}; + A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error"); + struct timespec to = do_timespec_diff(&now, &abstime); + struct timespec *pto = NULL; + if (timeout!=-1) { + pto = &to; + } + + eok->changed = 0; + eok->wakenup = 0; + eok->waiting = 1; + + struct kevent64_s *kchanges = eok->kchanges; + int nchanges = eok->nchanges; + int ichanges = eok->ichanges; + eok->kchanges = NULL; + eok->nchanges = 0; + eok->ichanges = 0; + + A(0==pthread_mutex_unlock(&eok->lock), ""); + r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto); + e = errno; + A(0==pthread_mutex_lock(&eok->lock), ""); + + eok->waiting = 0; + if (kchanges) { + free(kchanges); + kchanges = NULL; + nchanges = 0; + ichanges = 0; + } + + eok->waiting = 0; + if (r<0) break; + if (r==0) { + A(timeout!=-1, "internal logic error"); + } + for (int i=0; iudata && eok->evs_head && eok->evs_tail, "internal logic error"); + eok_event_t *ev = (eok_event_t*)kev->udata; + A(kev->ident == ev->fd, "internal logic error"); + D("..."); + switch (kev->filter) { + case EVFILT_READ: { + A((ev->epev.events & EPOLLIN), "internal logic errro"); + if (ev->epev.data.ptr==&eok_dummy) { + char c = '\0'; + A(1==recv(kev->ident, &c, 1, 0), "internal logic error"); + A(0==memcmp(&c, "1", 1), "internal logic error"); + D("..............."); + } else { + if (ev->changed==3) { + D("already requested to delete"); + // EV_DELETE? + continue; + } + struct epoll_event pev = {0}; + pev.data.ptr = ev->epev.data.ptr; + pev.events = EPOLLIN; + if (kev->flags & EV_EOF) { + pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP); + } + pev.events &= ev->epev.events; + events[cnts++] = pev; + } + } break; + case EVFILT_WRITE: { + A(0, "not implemented yet"); + } break; + default: { + A(0, "internal logic error"); + } break; + } + } + } while (cnts==0); + A(0==pthread_mutex_unlock(&eok->lock), ""); + + if (e) { + errno = e; + return -1; + } + + return cnts; +} + +static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to) { + struct timespec delta; + delta.tv_sec = to->tv_sec - from->tv_sec; + delta.tv_nsec = to->tv_nsec - from->tv_nsec; + while (delta.tv_nsec<0) { + delta.tv_sec -= 1; + delta.tv_nsec += 1000000000; + } + return delta; +} + +int epoll_close(int epfd) { + if (epfd<0 || epfd>=eoks.neoks) { + errno = EBADF; + return -1; + } + ep_over_kq_t *eok = eoks_find(epfd); + if (!eok) { + errno = EBADF; + return -1; + } + + A(0==pthread_mutex_lock(&eok->lock), ""); + do { + A(eok->stopping==0, "internal logic error"); + eok->stopping = 1; + A(eok->waiting, "internal logic error"); + + if (eok->kq!=-1) { + close(eok->kq); + eok->kq = -1; + } + } while (0); + A(0==pthread_mutex_unlock(&eok->lock), ""); + eoks_free(eok); + + return 0; +} + +static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents) { + if (maxevents<=eok->nevslist) return eok->kevslist; + struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kevslist, sizeof(*p)*maxevents); + if (!p) return NULL; + eok->kevslist = p; + eok->nevslist = maxevents; + return p; +} + +static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd) { + eok_event_t *p = eok->evs_head; + while (p) { + if (p->fd == fd) return p; + p = p->next; + } + errno = ENOENT; + return NULL; +} + +static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) { + eok_event_t *p = NULL; + if (eok->evs_free) { + p = eok->evs_free; + eok->evs_free = p->next; + p->next = NULL; + } else { + p = (eok_event_t*)calloc(1, sizeof(*p)); + if (!p) return NULL; + } + // dirty link + p->prev = eok->evs_tail; + if (eok->evs_tail) eok->evs_tail->next = p; + else eok->evs_head = p; + eok->evs_tail = p; + + return p; +} + +static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) { + if (ev->prev) ev->prev->next = ev->next; + else eok->evs_head = ev->next; + ev->prev = NULL; + if (ev->next) ev->next->prev = ev->prev; + else eok->evs_tail = ev->prev; + ev->next = eok->evs_free; + eok->evs_free = ev->next; +} + +static void eok_wakeup(ep_over_kq_t *eok) { + if (!eok->waiting) return; + if (eok->wakenup) return; + eok->wakenup = 1; + send(eok->sv[1], "1", 1, 0); +} + +static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev) { + if (!oev) oev = eok_calloc_ev(eok); + if (!oev) return -1; + int n = 0; + if (krev->ident==ev->fd) ++n; + if (kwev->ident==ev->fd) ++n; + A(n, "internal logic error"); + if (eok->ichanges+n>eok->nchanges) { + struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kchanges, sizeof(*p) * (eok->nchanges + 10)); + if (!p) { + if (ev->changed==1) { + eok_free_ev(eok, oev); + } + errno = ENOMEM; + return -1; + } + eok->kchanges = p; + eok->nchanges += 10; + } + + oev->fd = ev->fd; + oev->epev = ev->epev; + oev->changed = ev->changed; + + if (krev->ident==ev->fd) { + krev->udata = (uint64_t)oev; + eok->kchanges[eok->ichanges++] = *krev; + } + if (kwev->ident==ev->fd) { + kwev->udata = (uint64_t)oev; + eok->kchanges[eok->ichanges++] = *kwev; + } + return 0; +} + +static ep_over_kq_t* eoks_alloc(void) { + ep_over_kq_t *eok = NULL; + + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + if (eoks.eoks_free) { + eok = eoks.eoks_free; + eoks.eoks_free = eok->next; + eok->next = NULL; + break; + } + ep_over_kq_t *p = (ep_over_kq_t*)realloc(eoks.eoks, sizeof(*p) * (eoks.neoks+1)); + if (!p) break; + eoks.eoks = p; + eok = eoks.eoks + eoks.neoks; + memset(eok, 0, sizeof(*eok)); + eok->idx = eoks.neoks; + eok->kq = -1; + eok->sv[0] = -1; + eok->sv[1] = -1; + eoks.neoks += 1; + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); + + if (!eok) return NULL; + if (eok->lock_valid) { + return eok; + } + + if (0==pthread_mutex_init(&eok->lock, NULL)) { + eok->lock_valid = 1; + return eok; + } + + eoks_free(eok); + return NULL; +} + +static void eoks_free(ep_over_kq_t *eok) { + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + A(eok->next==NULL, "internal logic error"); + A(eok->evs_head==NULL, "internal logic error"); + A(eok->waiting==0, "internal logic error"); + if (eok->sv[0]!=-1) { + close(eok->sv[0]); + eok->sv[0] = -1; + } + if (eok->sv[1]!=-1) { + close(eok->sv[1]); + eok->sv[1] = -1; + } + if (eok->kq!=-1) { + close(eok->kq); + eok->kq = -1; + } + eok->next = eoks.eoks_free; + eoks.eoks_free = eok; + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); +} + +static ep_over_kq_t* eoks_find(int epfd) { + ep_over_kq_t *eok = NULL; + A(0==pthread_mutex_lock(&eoks.lock), ""); + do { + if (epfd<0 || epfd>=eoks.neoks) { + break; + } + A(eoks.eoks, "internal logic error"); + eok = eoks.eoks + epfd; + A(eok->next==NULL, "internal logic error"); + A(eok->lock_valid, "internal logic error"); + } while (0); + A(0==pthread_mutex_unlock(&eoks.lock), ""); + return eok; +} + diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c index 729471247f..c6cbbe6b83 100644 --- a/src/os/src/detail/osSocket.c +++ b/src/os/src/detail/osSocket.c @@ -58,6 +58,7 @@ void taosBlockSIGPIPE() { #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); } @@ -73,4 +74,4 @@ const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); } -#endif \ No newline at end of file +#endif diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 4896d50c6c..1cc73aef06 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -52,7 +52,11 @@ static void httpStopThread(HttpThread* pThread) { close(fd); } +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else close(pThread->pollFd); +#endif pthread_mutex_destroy(&(pThread->threadMutex)); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 178b96c423..ff2a5882f8 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -133,6 +133,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); code = -1; @@ -293,6 +294,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * } pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP client epoll", label); free(pThreadObj); @@ -307,7 +309,11 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); pthread_attr_destroy(&thattr); if (code != 0) { +#ifdef __APPLE__ + epoll_close(pThreadObj->pollFd); +#else taosCloseSocket(pThreadObj->pollFd); +#endif free(pThreadObj); terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); @@ -337,6 +343,10 @@ void taosCleanUpTcpClient(void *chandle) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { SThreadObj * pThreadObj = shandle; + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "pThreadObj->ip:%d\n", pThreadObj->ip); + fprintf(stderr, "PF_INET/AF_INET:%d/%d\n", PF_INET, AF_INET); + fprintf(stderr, "ip/port:%x/%d\n", ip, port); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); if (fd < 0) return NULL; @@ -348,6 +358,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin localPort = (uint16_t)ntohs(sin.sin_port); } + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); if (pFdObj) { @@ -358,6 +369,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); } else { tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); taosCloseSocket(fd); } @@ -480,27 +492,32 @@ static void *taosProcessTcpData(void *param) { if (fdNum < 0) continue; for (int i = 0; i < fdNum; ++i) { + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); pFdObj = events[i].data.ptr; if (events[i].events & EPOLLERR) { tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLRDHUP) { tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLHUP) { tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); taosReportBrokenLink(pFdObj); continue; } if (taosReadTcpData(pFdObj, &recvInfo) < 0) { + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); shutdown(pFdObj->fd, SHUT_WR); continue; } @@ -512,7 +529,11 @@ static void *taosProcessTcpData(void *param) { if (pThreadObj->stop) break; } +#ifdef __APPLE__ + if (pThreadObj->pollFd >=0) epoll_close(pThreadObj->pollFd); +#else if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); +#endif while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; @@ -542,6 +563,7 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { event.events = EPOLLIN | EPOLLRDHUP; event.data.ptr = pFdObj; + fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__); if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { tfree(pFdObj); terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 4744666737..be85853819 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -136,12 +136,15 @@ void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { event.events = EPOLLIN | EPOLLRDHUP; event.data.ptr = pConn; + fprintf(stderr, ">>>>>>>>>>>>>>>>>\n"); if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { + fprintf(stderr, "<<<<<<<<<<<<<<<<<\n"); sError("failed to add fd:%d since %s", connFd, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); tfree(pConn); pConn = NULL; } else { + fprintf(stderr, "<<<<<<<<<<<<<<<<<\n"); pThread->numOfFds++; sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); } @@ -167,7 +170,9 @@ static void taosProcessBrokenLink(SConnObj *pConn) { (*pInfo->processBrokenLink)(pConn->handleId); pThread->numOfFds--; + fprintf(stderr, "<<<<<<<<<<<<<<<<<\n"); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); + fprintf(stderr, "<<<<<<<<<<<<<<<<<\n"); sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); taosClose(pConn->fd); tfree(pConn); @@ -233,7 +238,11 @@ static void *syncProcessTcpData(void *param) { sDebug("%p TCP epoll thread exits", pThread); +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else close(pThread->pollFd); +#endif tfree(pThread); tfree(buffer); return NULL; @@ -278,6 +287,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { pThread->pPool = pPool; pThread->pollFd = epoll_create(10); // size does not matter + fprintf(stderr, "...............\n"); if (pThread->pollFd < 0) { tfree(pThread); return NULL; @@ -290,7 +300,11 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { pthread_attr_destroy(&thattr); if (ret != 0) { +#ifdef __APPLE__ + epoll_close(pThread->pollFd); +#else close(pThread->pollFd); +#endif tfree(pThread); return NULL; } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5067974903..984839162e 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -233,7 +233,11 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; +#ifdef __APPLE__ + sem_t *readyToCommit; +#else sem_t readyToCommit; +#endif pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code @@ -616,4 +620,4 @@ int tsdbScheduleCommit(STsdbRepo *pRepo); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 696270d670..9551d1b3a5 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -166,7 +166,11 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { pRepo->imem = NULL; tsdbUnlockRepo(pRepo); tsdbUnRefMemTable(pRepo, pIMem); +#ifdef __APPLE__ + sem_post(pRepo->readyToCommit); +#else sem_post(&(pRepo->readyToCommit)); +#endif } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b34b2fa9e6..602aea70e3 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -146,7 +146,11 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); +#else sem_wait(&(pRepo->readyToCommit)); +#endif terrno = pRepo->code; } tsdbUnRefMemTable(pRepo, pRepo->mem); @@ -643,11 +647,19 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } +#ifdef __APPLE__ + pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1); + if (!pRepo->readyToCommit) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } +#else code = sem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); goto _err; } +#endif pRepo->repoLocked = false; @@ -693,7 +705,11 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); tfree(pRepo->rootDir); +#ifdef __APPLE__ + sem_close(pRepo->readyToCommit); +#else sem_destroy(&(pRepo->readyToCommit)); +#endif pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 07f001f68a..202405df00 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -207,7 +207,11 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { if (pRepo->mem == NULL) return 0; +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); +#else sem_wait(&(pRepo->readyToCommit)); +#endif ASSERT(pRepo->imem == NULL); @@ -229,8 +233,13 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; tsdbAsyncCommit(pRepo); +#ifdef __APPLE__ + sem_wait(pRepo->readyToCommit); + sem_post(pRepo->readyToCommit); +#else sem_wait(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit)); +#endif if (pRepo->code != TSDB_CODE_SUCCESS) { terrno = pRepo->code; @@ -927,4 +936,4 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow } return 0; -} \ No newline at end of file +} diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 1a5c3bd64d..6b15d3677c 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -35,7 +35,7 @@ int32_t taosGetFqdn(char *fqdn) { hints.ai_flags = AI_CANONNAME; int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + uError("failed to get fqdn for hostname <%s>, code:%d, reason:%s", hostname, ret, gai_strerror(ret)); return -1; } @@ -341,6 +341,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) { return -1; } +#ifndef __APPLE__ int32_t probes = 3; if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); @@ -361,6 +362,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) { taosCloseSocket(sockFd); return -1; } +#endif int32_t nodelay = 1; if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { diff --git a/tests/examples/c/CMakeLists.txt b/tests/examples/c/CMakeLists.txt index 59bcb6eaff..4689ed8868 100644 --- a/tests/examples/c/CMakeLists.txt +++ b/tests/examples/c/CMakeLists.txt @@ -6,3 +6,8 @@ IF (TD_LINUX) ADD_EXECUTABLE(demo demo.c) TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ) ENDIF () +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(. ${TD_COMMUNITY_DIR}/src/inc ${TD_COMMUNITY_DIR}/src/client/inc ${TD_COMMUNITY_DIR}/inc) + ADD_EXECUTABLE(epoll epoll.c) + TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread ) +ENDIF ()