eok
This commit is contained in:
parent
a67f7504df
commit
a649f55195
|
@ -19,5 +19,5 @@ ADD_SUBDIRECTORY(wal)
|
||||||
ADD_SUBDIRECTORY(cq)
|
ADD_SUBDIRECTORY(cq)
|
||||||
ADD_SUBDIRECTORY(dnode)
|
ADD_SUBDIRECTORY(dnode)
|
||||||
#ADD_SUBDIRECTORY(connector/odbc)
|
#ADD_SUBDIRECTORY(connector/odbc)
|
||||||
ADD_SUBDIRECTORY(connector/jdbc)
|
#ADD_SUBDIRECTORY(connector/jdbc)
|
||||||
|
|
||||||
|
|
|
@ -49,12 +49,12 @@ ELSEIF (TD_DARWIN)
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
|
||||||
|
|
||||||
ADD_LIBRARY(taos_static STATIC ${SRC})
|
ADD_LIBRARY(taos_static STATIC ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(taos_static trpc tutil pthread m)
|
TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m)
|
||||||
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
|
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
|
||||||
|
|
||||||
# generate dynamic library (*.dylib)
|
# generate dynamic library (*.dylib)
|
||||||
ADD_LIBRARY(taos SHARED ${SRC})
|
ADD_LIBRARY(taos SHARED ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(taos trpc tutil pthread m)
|
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m)
|
||||||
|
|
||||||
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
|
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
|
||||||
|
|
||||||
|
|
|
@ -13,10 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef __APPLE__
|
||||||
#define _BSD_SOURCE
|
#define _BSD_SOURCE
|
||||||
#define _XOPEN_SOURCE 500
|
#define _XOPEN_SOURCE 500
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#define _GNU_SOURCE
|
#define _GNU_SOURCE
|
||||||
|
#endif // __APPLE__
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "ttype.h"
|
#include "ttype.h"
|
||||||
|
|
|
@ -2486,7 +2486,11 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
|
||||||
SSqlRes* pRes = &pSql->res;
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
// set the sql object owner
|
// set the sql object owner
|
||||||
|
#ifdef __APPLE__
|
||||||
|
pthread_t threadId = (pthread_t)taosGetSelfPthreadId();
|
||||||
|
#else
|
||||||
uint64_t threadId = taosGetSelfPthreadId();
|
uint64_t threadId = taosGetSelfPthreadId();
|
||||||
|
#endif
|
||||||
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
||||||
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -21,6 +21,8 @@
|
||||||
#include "shellCommand.h"
|
#include "shellCommand.h"
|
||||||
#include "tkey.h"
|
#include "tkey.h"
|
||||||
|
|
||||||
|
#include "tscLog.h"
|
||||||
|
|
||||||
#define OPT_ABORT 1 /* <20>Cabort */
|
#define OPT_ABORT 1 /* <20>Cabort */
|
||||||
|
|
||||||
int indicator = 1;
|
int indicator = 1;
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
#ifndef _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
|
||||||
|
#define _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
enum EPOLL_EVENTS
|
||||||
|
{
|
||||||
|
EPOLLIN = 0x001,
|
||||||
|
#define EPOLLIN EPOLLIN
|
||||||
|
EPOLLPRI = 0x002,
|
||||||
|
#define EPOLLPRI EPOLLPRI
|
||||||
|
EPOLLOUT = 0x004,
|
||||||
|
#define EPOLLOUT EPOLLOUT
|
||||||
|
EPOLLRDNORM = 0x040,
|
||||||
|
#define EPOLLRDNORM EPOLLRDNORM
|
||||||
|
EPOLLRDBAND = 0x080,
|
||||||
|
#define EPOLLRDBAND EPOLLRDBAND
|
||||||
|
EPOLLWRNORM = 0x100,
|
||||||
|
#define EPOLLWRNORM EPOLLWRNORM
|
||||||
|
EPOLLWRBAND = 0x200,
|
||||||
|
#define EPOLLWRBAND EPOLLWRBAND
|
||||||
|
EPOLLMSG = 0x400,
|
||||||
|
#define EPOLLMSG EPOLLMSG
|
||||||
|
EPOLLERR = 0x008,
|
||||||
|
#define EPOLLERR EPOLLERR
|
||||||
|
EPOLLHUP = 0x010,
|
||||||
|
#define EPOLLHUP EPOLLHUP
|
||||||
|
EPOLLRDHUP = 0x2000,
|
||||||
|
#define EPOLLRDHUP EPOLLRDHUP
|
||||||
|
EPOLLEXCLUSIVE = 1u << 28,
|
||||||
|
#define EPOLLEXCLUSIVE EPOLLEXCLUSIVE
|
||||||
|
EPOLLWAKEUP = 1u << 29,
|
||||||
|
#define EPOLLWAKEUP EPOLLWAKEUP
|
||||||
|
EPOLLONESHOT = 1u << 30,
|
||||||
|
#define EPOLLONESHOT EPOLLONESHOT
|
||||||
|
EPOLLET = 1u << 31
|
||||||
|
#define EPOLLET EPOLLET
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */
|
||||||
|
#define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */
|
||||||
|
#define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */
|
||||||
|
#define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */
|
||||||
|
|
||||||
|
|
||||||
|
typedef union epoll_data
|
||||||
|
{
|
||||||
|
void *ptr;
|
||||||
|
int fd;
|
||||||
|
uint32_t u32;
|
||||||
|
uint64_t u64;
|
||||||
|
} epoll_data_t;
|
||||||
|
|
||||||
|
struct epoll_event
|
||||||
|
{
|
||||||
|
uint32_t events; /* Epoll events */
|
||||||
|
epoll_data_t data; /* User data variable */
|
||||||
|
};
|
||||||
|
|
||||||
|
int epoll_create(int size);
|
||||||
|
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
|
||||||
|
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
|
||||||
|
int epoll_close(int epfd);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
|
||||||
|
|
|
@ -91,7 +91,7 @@ extern "C" {
|
||||||
typedef int(*__compar_fn_t)(const void *, const void *);
|
typedef int(*__compar_fn_t)(const void *, const void *);
|
||||||
|
|
||||||
// for send function in tsocket.c
|
// for send function in tsocket.c
|
||||||
#define MSG_NOSIGNAL 0
|
// #define MSG_NOSIGNAL 0
|
||||||
#define SO_NO_CHECK 0x1234
|
#define SO_NO_CHECK 0x1234
|
||||||
#define SOL_TCP 0x1234
|
#define SOL_TCP 0x1234
|
||||||
#define TCP_KEEPIDLE 0x1234
|
#define TCP_KEEPIDLE 0x1234
|
||||||
|
@ -100,6 +100,15 @@ typedef int(*__compar_fn_t)(const void *, const void *);
|
||||||
#define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE
|
#define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int64_t tsosStr2int64(char *str);
|
||||||
|
|
||||||
|
#include "eok.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
#include <sys/event.h>
|
||||||
|
|
||||||
void osInit() {
|
void osInit() {
|
||||||
if (configDir[0] == 0) {
|
if (configDir[0] == 0) {
|
||||||
strcpy(configDir, "~/TDengine/cfg");
|
strcpy(configDir, "~/TDengine/cfg");
|
||||||
|
@ -30,3 +32,4 @@ void osInit() {
|
||||||
strcpy(tsScriptDir, "~/TDengine/cfg");
|
strcpy(tsScriptDir, "~/TDengine/cfg");
|
||||||
strcpy(tsOsName, "Darwin");
|
strcpy(tsOsName, "Darwin");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,580 @@
|
||||||
|
#include "eok.h"
|
||||||
|
|
||||||
|
#include <errno.h>
|
||||||
|
#include <libgen.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/event.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
||||||
|
#define A(statement, fmt, ...) do { \
|
||||||
|
if (statement) break; \
|
||||||
|
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \
|
||||||
|
basename(__FILE__), __LINE__, __func__, \
|
||||||
|
#statement, errno, strerror(errno), \
|
||||||
|
##__VA_ARGS__); \
|
||||||
|
abort(); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define E(fmt, ...) do { \
|
||||||
|
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \
|
||||||
|
basename(__FILE__), __LINE__, __func__, \
|
||||||
|
errno, strerror(errno), \
|
||||||
|
##__VA_ARGS__); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
static int eok_dummy = 0;
|
||||||
|
|
||||||
|
typedef struct ep_over_kq_s ep_over_kq_t;
|
||||||
|
typedef struct eok_event_s eok_event_t;
|
||||||
|
|
||||||
|
struct ep_over_kq_s {
|
||||||
|
int kq;
|
||||||
|
int idx;
|
||||||
|
ep_over_kq_t *next;
|
||||||
|
|
||||||
|
int sv[2]; // 0 for read, 1 for write
|
||||||
|
|
||||||
|
eok_event_t *evs_head;
|
||||||
|
eok_event_t *evs_tail;
|
||||||
|
eok_event_t *evs_free;
|
||||||
|
|
||||||
|
struct kevent64_s *kchanges;
|
||||||
|
int nchanges;
|
||||||
|
int ichanges;
|
||||||
|
|
||||||
|
struct kevent64_s *kevslist;
|
||||||
|
int nevslist;
|
||||||
|
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
|
||||||
|
volatile unsigned int lock_valid:1;
|
||||||
|
volatile unsigned int waiting:1;
|
||||||
|
volatile unsigned int changed:1;
|
||||||
|
volatile unsigned int wakenup:1;
|
||||||
|
volatile unsigned int stopping:1;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct eok_event_s {
|
||||||
|
int fd;
|
||||||
|
struct epoll_event epev;
|
||||||
|
volatile unsigned int changed; // 0:registered;1:add;2:mod;3:del
|
||||||
|
|
||||||
|
eok_event_t *next;
|
||||||
|
eok_event_t *prev;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef struct eoks_s eoks_t;
|
||||||
|
struct eoks_s {
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
ep_over_kq_t *eoks;
|
||||||
|
int neoks;
|
||||||
|
ep_over_kq_t *eoks_free;
|
||||||
|
};
|
||||||
|
static eoks_t eoks = {
|
||||||
|
.lock = PTHREAD_MUTEX_INITIALIZER,
|
||||||
|
.eoks = NULL,
|
||||||
|
.neoks = 0,
|
||||||
|
.eoks_free = NULL,
|
||||||
|
};
|
||||||
|
|
||||||
|
static ep_over_kq_t* eoks_alloc(void);
|
||||||
|
static void eoks_free(ep_over_kq_t *eok);
|
||||||
|
static ep_over_kq_t* eoks_find(int epfd);
|
||||||
|
|
||||||
|
static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd);
|
||||||
|
static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok);
|
||||||
|
static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev);
|
||||||
|
static void eok_wakeup(ep_over_kq_t *eok);
|
||||||
|
|
||||||
|
static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev);
|
||||||
|
|
||||||
|
static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents);
|
||||||
|
|
||||||
|
int epoll_create(int size) {
|
||||||
|
(void)size;
|
||||||
|
int e = 0;
|
||||||
|
ep_over_kq_t *eok = eoks_alloc();
|
||||||
|
if (!eok) {
|
||||||
|
errno = ENOMEM;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
A(eok->kq==-1, "internal logic error");
|
||||||
|
A(eok->lock_valid, "internal logic error");
|
||||||
|
A(eok->idx>=0 && eok->idx<eoks.neoks, "internal logic error");
|
||||||
|
A(eok->next==NULL, "internal logic error");
|
||||||
|
A(eok->sv[0]==-1, "internal logic error");
|
||||||
|
A(eok->sv[1]==-1, "internal logic error");
|
||||||
|
|
||||||
|
eok->kq = kqueue();
|
||||||
|
if (eok->kq==-1) {
|
||||||
|
e = errno;
|
||||||
|
eoks_free(eok);
|
||||||
|
errno = e;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, eok->sv)) {
|
||||||
|
e = errno;
|
||||||
|
eoks_free(eok);
|
||||||
|
errno = e;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct epoll_event ev = {0};
|
||||||
|
ev.events = EPOLLIN;
|
||||||
|
ev.data.ptr = &eok_dummy;
|
||||||
|
if (epoll_ctl(eok->idx, EPOLL_CTL_ADD, eok->sv[0], &ev)) {
|
||||||
|
A(0, "internal logic error");
|
||||||
|
epoll_close(eok->idx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return eok->idx;
|
||||||
|
}
|
||||||
|
|
||||||
|
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
|
||||||
|
int e = 0;
|
||||||
|
if (epfd<0 || epfd>=eoks.neoks) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (fd==-1) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (event && !(event->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLOUT))) {
|
||||||
|
e = ENOTSUP;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ep_over_kq_t *eok = eoks_find(epfd);
|
||||||
|
if (!eok) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
A(0==pthread_mutex_lock(&eok->lock), "");
|
||||||
|
do {
|
||||||
|
eok_event_t* oev = eok_find_ev(eok, fd);
|
||||||
|
if (op==EPOLL_CTL_ADD && oev) {
|
||||||
|
e = EEXIST;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (op!=EPOLL_CTL_ADD && !oev) {
|
||||||
|
e = ENOENT;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct kevent64_s krev = {0};
|
||||||
|
struct kevent64_s kwev = {0};
|
||||||
|
krev.ident = -1;
|
||||||
|
kwev.ident = -1;
|
||||||
|
uint16_t flags = 0;
|
||||||
|
eok_event_t ev = {0};
|
||||||
|
ev.fd = fd;
|
||||||
|
if (event) ev.epev = *event;
|
||||||
|
struct epoll_event *pev = event;
|
||||||
|
switch (op) {
|
||||||
|
case EPOLL_CTL_ADD: {
|
||||||
|
if (!event) {
|
||||||
|
e = EINVAL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
flags = EV_ADD;
|
||||||
|
ev.changed = 1;
|
||||||
|
} break;
|
||||||
|
case EPOLL_CTL_MOD: {
|
||||||
|
if (!event) {
|
||||||
|
e = EINVAL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
flags = EV_ADD;
|
||||||
|
ev.changed = 2;
|
||||||
|
} break;
|
||||||
|
case EPOLL_CTL_DEL: {
|
||||||
|
// event is ignored
|
||||||
|
pev = &oev->epev;
|
||||||
|
flags = EV_DELETE;
|
||||||
|
ev.changed = 3;
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
e = ENOTSUP;
|
||||||
|
} break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (e) break;
|
||||||
|
|
||||||
|
if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
|
||||||
|
flags |= EV_EOF;
|
||||||
|
EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0);
|
||||||
|
D("....");
|
||||||
|
}
|
||||||
|
if (pev->events & EPOLLOUT) {
|
||||||
|
EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0);
|
||||||
|
D("....");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) {
|
||||||
|
e = errno;
|
||||||
|
A(e, "internal logic error");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
eok->changed = 1;
|
||||||
|
eok_wakeup(eok);
|
||||||
|
} while (0);
|
||||||
|
A(0==pthread_mutex_unlock(&eok->lock), "");
|
||||||
|
|
||||||
|
if (e) {
|
||||||
|
errno = e;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to);
|
||||||
|
|
||||||
|
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
|
||||||
|
int e = 0;
|
||||||
|
if (epfd<0 || epfd>=eoks.neoks) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (!events) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (maxevents<=0) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int r = 0;
|
||||||
|
|
||||||
|
ep_over_kq_t *eok = eoks_find(epfd);
|
||||||
|
if (!eok) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct timespec abstime = {0};
|
||||||
|
A(TIME_UTC==timespec_get(&abstime, TIME_UTC), "internal logic error");
|
||||||
|
|
||||||
|
if (timeout!=-1) {
|
||||||
|
if (timeout<0) timeout = 0;
|
||||||
|
int64_t t = abstime.tv_nsec + timeout * 1000000;
|
||||||
|
abstime.tv_sec += t / 1000000000;
|
||||||
|
abstime.tv_nsec %= 1000000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
int cnts = 0;
|
||||||
|
A(0==pthread_mutex_lock(&eok->lock), "");
|
||||||
|
do {
|
||||||
|
cnts = 0;
|
||||||
|
A(eok->waiting==0, "internal logic error");
|
||||||
|
struct kevent64_s *eventslist = eok_alloc_eventslist(eok, maxevents);
|
||||||
|
if (!eventslist) {
|
||||||
|
e = ENOMEM;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
memset(eventslist, 0, maxevents * sizeof(*eventslist));
|
||||||
|
|
||||||
|
struct timespec now = {0};
|
||||||
|
A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error");
|
||||||
|
struct timespec to = do_timespec_diff(&now, &abstime);
|
||||||
|
struct timespec *pto = NULL;
|
||||||
|
if (timeout!=-1) {
|
||||||
|
pto = &to;
|
||||||
|
}
|
||||||
|
|
||||||
|
eok->changed = 0;
|
||||||
|
eok->wakenup = 0;
|
||||||
|
eok->waiting = 1;
|
||||||
|
|
||||||
|
struct kevent64_s *kchanges = eok->kchanges;
|
||||||
|
int nchanges = eok->nchanges;
|
||||||
|
int ichanges = eok->ichanges;
|
||||||
|
eok->kchanges = NULL;
|
||||||
|
eok->nchanges = 0;
|
||||||
|
eok->ichanges = 0;
|
||||||
|
|
||||||
|
A(0==pthread_mutex_unlock(&eok->lock), "");
|
||||||
|
r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto);
|
||||||
|
e = errno;
|
||||||
|
A(0==pthread_mutex_lock(&eok->lock), "");
|
||||||
|
|
||||||
|
eok->waiting = 0;
|
||||||
|
if (kchanges) {
|
||||||
|
free(kchanges);
|
||||||
|
kchanges = NULL;
|
||||||
|
nchanges = 0;
|
||||||
|
ichanges = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
eok->waiting = 0;
|
||||||
|
if (r<0) break;
|
||||||
|
if (r==0) {
|
||||||
|
A(timeout!=-1, "internal logic error");
|
||||||
|
}
|
||||||
|
for (int i=0; i<r; ++i) {
|
||||||
|
struct kevent64_s *kev = eventslist + i;
|
||||||
|
A(kev->udata && eok->evs_head && eok->evs_tail, "internal logic error");
|
||||||
|
eok_event_t *ev = (eok_event_t*)kev->udata;
|
||||||
|
A(kev->ident == ev->fd, "internal logic error");
|
||||||
|
D("...");
|
||||||
|
switch (kev->filter) {
|
||||||
|
case EVFILT_READ: {
|
||||||
|
A((ev->epev.events & EPOLLIN), "internal logic errro");
|
||||||
|
if (ev->epev.data.ptr==&eok_dummy) {
|
||||||
|
char c = '\0';
|
||||||
|
A(1==recv(kev->ident, &c, 1, 0), "internal logic error");
|
||||||
|
A(0==memcmp(&c, "1", 1), "internal logic error");
|
||||||
|
D("...............");
|
||||||
|
} else {
|
||||||
|
if (ev->changed==3) {
|
||||||
|
D("already requested to delete");
|
||||||
|
// EV_DELETE?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
struct epoll_event pev = {0};
|
||||||
|
pev.data.ptr = ev->epev.data.ptr;
|
||||||
|
pev.events = EPOLLIN;
|
||||||
|
if (kev->flags & EV_EOF) {
|
||||||
|
pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP);
|
||||||
|
}
|
||||||
|
pev.events &= ev->epev.events;
|
||||||
|
events[cnts++] = pev;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case EVFILT_WRITE: {
|
||||||
|
A(0, "not implemented yet");
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
A(0, "internal logic error");
|
||||||
|
} break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (cnts==0);
|
||||||
|
A(0==pthread_mutex_unlock(&eok->lock), "");
|
||||||
|
|
||||||
|
if (e) {
|
||||||
|
errno = e;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return cnts;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct timespec do_timespec_diff(struct timespec *from, struct timespec *to) {
|
||||||
|
struct timespec delta;
|
||||||
|
delta.tv_sec = to->tv_sec - from->tv_sec;
|
||||||
|
delta.tv_nsec = to->tv_nsec - from->tv_nsec;
|
||||||
|
while (delta.tv_nsec<0) {
|
||||||
|
delta.tv_sec -= 1;
|
||||||
|
delta.tv_nsec += 1000000000;
|
||||||
|
}
|
||||||
|
return delta;
|
||||||
|
}
|
||||||
|
|
||||||
|
int epoll_close(int epfd) {
|
||||||
|
if (epfd<0 || epfd>=eoks.neoks) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ep_over_kq_t *eok = eoks_find(epfd);
|
||||||
|
if (!eok) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
A(0==pthread_mutex_lock(&eok->lock), "");
|
||||||
|
do {
|
||||||
|
A(eok->stopping==0, "internal logic error");
|
||||||
|
eok->stopping = 1;
|
||||||
|
A(eok->waiting, "internal logic error");
|
||||||
|
|
||||||
|
if (eok->kq!=-1) {
|
||||||
|
close(eok->kq);
|
||||||
|
eok->kq = -1;
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
A(0==pthread_mutex_unlock(&eok->lock), "");
|
||||||
|
eoks_free(eok);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct kevent64_s* eok_alloc_eventslist(ep_over_kq_t *eok, int maxevents) {
|
||||||
|
if (maxevents<=eok->nevslist) return eok->kevslist;
|
||||||
|
struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kevslist, sizeof(*p)*maxevents);
|
||||||
|
if (!p) return NULL;
|
||||||
|
eok->kevslist = p;
|
||||||
|
eok->nevslist = maxevents;
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
static eok_event_t* eok_find_ev(ep_over_kq_t *eok, int fd) {
|
||||||
|
eok_event_t *p = eok->evs_head;
|
||||||
|
while (p) {
|
||||||
|
if (p->fd == fd) return p;
|
||||||
|
p = p->next;
|
||||||
|
}
|
||||||
|
errno = ENOENT;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) {
|
||||||
|
eok_event_t *p = NULL;
|
||||||
|
if (eok->evs_free) {
|
||||||
|
p = eok->evs_free;
|
||||||
|
eok->evs_free = p->next;
|
||||||
|
p->next = NULL;
|
||||||
|
} else {
|
||||||
|
p = (eok_event_t*)calloc(1, sizeof(*p));
|
||||||
|
if (!p) return NULL;
|
||||||
|
}
|
||||||
|
// dirty link
|
||||||
|
p->prev = eok->evs_tail;
|
||||||
|
if (eok->evs_tail) eok->evs_tail->next = p;
|
||||||
|
else eok->evs_head = p;
|
||||||
|
eok->evs_tail = p;
|
||||||
|
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) {
|
||||||
|
if (ev->prev) ev->prev->next = ev->next;
|
||||||
|
else eok->evs_head = ev->next;
|
||||||
|
ev->prev = NULL;
|
||||||
|
if (ev->next) ev->next->prev = ev->prev;
|
||||||
|
else eok->evs_tail = ev->prev;
|
||||||
|
ev->next = eok->evs_free;
|
||||||
|
eok->evs_free = ev->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void eok_wakeup(ep_over_kq_t *eok) {
|
||||||
|
if (!eok->waiting) return;
|
||||||
|
if (eok->wakenup) return;
|
||||||
|
eok->wakenup = 1;
|
||||||
|
send(eok->sv[1], "1", 1, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev) {
|
||||||
|
if (!oev) oev = eok_calloc_ev(eok);
|
||||||
|
if (!oev) return -1;
|
||||||
|
int n = 0;
|
||||||
|
if (krev->ident==ev->fd) ++n;
|
||||||
|
if (kwev->ident==ev->fd) ++n;
|
||||||
|
A(n, "internal logic error");
|
||||||
|
if (eok->ichanges+n>eok->nchanges) {
|
||||||
|
struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kchanges, sizeof(*p) * (eok->nchanges + 10));
|
||||||
|
if (!p) {
|
||||||
|
if (ev->changed==1) {
|
||||||
|
eok_free_ev(eok, oev);
|
||||||
|
}
|
||||||
|
errno = ENOMEM;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
eok->kchanges = p;
|
||||||
|
eok->nchanges += 10;
|
||||||
|
}
|
||||||
|
|
||||||
|
oev->fd = ev->fd;
|
||||||
|
oev->epev = ev->epev;
|
||||||
|
oev->changed = ev->changed;
|
||||||
|
|
||||||
|
if (krev->ident==ev->fd) {
|
||||||
|
krev->udata = (uint64_t)oev;
|
||||||
|
eok->kchanges[eok->ichanges++] = *krev;
|
||||||
|
}
|
||||||
|
if (kwev->ident==ev->fd) {
|
||||||
|
kwev->udata = (uint64_t)oev;
|
||||||
|
eok->kchanges[eok->ichanges++] = *kwev;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ep_over_kq_t* eoks_alloc(void) {
|
||||||
|
ep_over_kq_t *eok = NULL;
|
||||||
|
|
||||||
|
A(0==pthread_mutex_lock(&eoks.lock), "");
|
||||||
|
do {
|
||||||
|
if (eoks.eoks_free) {
|
||||||
|
eok = eoks.eoks_free;
|
||||||
|
eoks.eoks_free = eok->next;
|
||||||
|
eok->next = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ep_over_kq_t *p = (ep_over_kq_t*)realloc(eoks.eoks, sizeof(*p) * (eoks.neoks+1));
|
||||||
|
if (!p) break;
|
||||||
|
eoks.eoks = p;
|
||||||
|
eok = eoks.eoks + eoks.neoks;
|
||||||
|
memset(eok, 0, sizeof(*eok));
|
||||||
|
eok->idx = eoks.neoks;
|
||||||
|
eok->kq = -1;
|
||||||
|
eok->sv[0] = -1;
|
||||||
|
eok->sv[1] = -1;
|
||||||
|
eoks.neoks += 1;
|
||||||
|
} while (0);
|
||||||
|
A(0==pthread_mutex_unlock(&eoks.lock), "");
|
||||||
|
|
||||||
|
if (!eok) return NULL;
|
||||||
|
if (eok->lock_valid) {
|
||||||
|
return eok;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0==pthread_mutex_init(&eok->lock, NULL)) {
|
||||||
|
eok->lock_valid = 1;
|
||||||
|
return eok;
|
||||||
|
}
|
||||||
|
|
||||||
|
eoks_free(eok);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void eoks_free(ep_over_kq_t *eok) {
|
||||||
|
A(0==pthread_mutex_lock(&eoks.lock), "");
|
||||||
|
do {
|
||||||
|
A(eok->next==NULL, "internal logic error");
|
||||||
|
A(eok->evs_head==NULL, "internal logic error");
|
||||||
|
A(eok->waiting==0, "internal logic error");
|
||||||
|
if (eok->sv[0]!=-1) {
|
||||||
|
close(eok->sv[0]);
|
||||||
|
eok->sv[0] = -1;
|
||||||
|
}
|
||||||
|
if (eok->sv[1]!=-1) {
|
||||||
|
close(eok->sv[1]);
|
||||||
|
eok->sv[1] = -1;
|
||||||
|
}
|
||||||
|
if (eok->kq!=-1) {
|
||||||
|
close(eok->kq);
|
||||||
|
eok->kq = -1;
|
||||||
|
}
|
||||||
|
eok->next = eoks.eoks_free;
|
||||||
|
eoks.eoks_free = eok;
|
||||||
|
} while (0);
|
||||||
|
A(0==pthread_mutex_unlock(&eoks.lock), "");
|
||||||
|
}
|
||||||
|
|
||||||
|
static ep_over_kq_t* eoks_find(int epfd) {
|
||||||
|
ep_over_kq_t *eok = NULL;
|
||||||
|
A(0==pthread_mutex_lock(&eoks.lock), "");
|
||||||
|
do {
|
||||||
|
if (epfd<0 || epfd>=eoks.neoks) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
A(eoks.eoks, "internal logic error");
|
||||||
|
eok = eoks.eoks + epfd;
|
||||||
|
A(eok->next==NULL, "internal logic error");
|
||||||
|
A(eok->lock_valid, "internal logic error");
|
||||||
|
} while (0);
|
||||||
|
A(0==pthread_mutex_unlock(&eoks.lock), "");
|
||||||
|
return eok;
|
||||||
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ void taosBlockSIGPIPE() {
|
||||||
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
|
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
|
||||||
|
|
||||||
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
|
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
|
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,4 +74,4 @@ const char *taosInetNtoa(struct in_addr ipInt) {
|
||||||
return inet_ntoa(ipInt);
|
return inet_ntoa(ipInt);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -52,7 +52,11 @@ static void httpStopThread(HttpThread* pThread) {
|
||||||
close(fd);
|
close(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
epoll_close(pThread->pollFd);
|
||||||
|
#else
|
||||||
close(pThread->pollFd);
|
close(pThread->pollFd);
|
||||||
|
#endif
|
||||||
pthread_mutex_destroy(&(pThread->threadMutex));
|
pthread_mutex_destroy(&(pThread->threadMutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
|
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP epoll", label);
|
tError("%s failed to create TCP epoll", label);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -293,6 +294,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter
|
pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP client epoll", label);
|
tError("%s failed to create TCP client epoll", label);
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
|
@ -307,7 +309,11 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
#ifdef __APPLE__
|
||||||
|
epoll_close(pThreadObj->pollFd);
|
||||||
|
#else
|
||||||
taosCloseSocket(pThreadObj->pollFd);
|
taosCloseSocket(pThreadObj->pollFd);
|
||||||
|
#endif
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
||||||
|
@ -337,6 +343,10 @@ void taosCleanUpTcpClient(void *chandle) {
|
||||||
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
||||||
SThreadObj * pThreadObj = shandle;
|
SThreadObj * pThreadObj = shandle;
|
||||||
|
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
fprintf(stderr, "pThreadObj->ip:%d\n", pThreadObj->ip);
|
||||||
|
fprintf(stderr, "PF_INET/AF_INET:%d/%d\n", PF_INET, AF_INET);
|
||||||
|
fprintf(stderr, "ip/port:%x/%d\n", ip, port);
|
||||||
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
|
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
|
||||||
if (fd < 0) return NULL;
|
if (fd < 0) return NULL;
|
||||||
|
|
||||||
|
@ -348,6 +358,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
|
||||||
localPort = (uint16_t)ntohs(sin.sin_port);
|
localPort = (uint16_t)ntohs(sin.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
|
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
|
||||||
|
|
||||||
if (pFdObj) {
|
if (pFdObj) {
|
||||||
|
@ -358,6 +369,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
|
||||||
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
|
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
|
||||||
} else {
|
} else {
|
||||||
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
|
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
taosCloseSocket(fd);
|
taosCloseSocket(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,27 +492,32 @@ static void *taosProcessTcpData(void *param) {
|
||||||
if (fdNum < 0) continue;
|
if (fdNum < 0) continue;
|
||||||
|
|
||||||
for (int i = 0; i < fdNum; ++i) {
|
for (int i = 0; i < fdNum; ++i) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
pFdObj = events[i].data.ptr;
|
pFdObj = events[i].data.ptr;
|
||||||
|
|
||||||
if (events[i].events & EPOLLERR) {
|
if (events[i].events & EPOLLERR) {
|
||||||
tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
|
tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
taosReportBrokenLink(pFdObj);
|
taosReportBrokenLink(pFdObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLRDHUP) {
|
if (events[i].events & EPOLLRDHUP) {
|
||||||
tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
taosReportBrokenLink(pFdObj);
|
taosReportBrokenLink(pFdObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLHUP) {
|
if (events[i].events & EPOLLHUP) {
|
||||||
tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
taosReportBrokenLink(pFdObj);
|
taosReportBrokenLink(pFdObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
|
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
shutdown(pFdObj->fd, SHUT_WR);
|
shutdown(pFdObj->fd, SHUT_WR);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -512,7 +529,11 @@ static void *taosProcessTcpData(void *param) {
|
||||||
if (pThreadObj->stop) break;
|
if (pThreadObj->stop) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
if (pThreadObj->pollFd >=0) epoll_close(pThreadObj->pollFd);
|
||||||
|
#else
|
||||||
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
||||||
|
#endif
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
while (pThreadObj->pHead) {
|
||||||
SFdObj *pFdObj = pThreadObj->pHead;
|
SFdObj *pFdObj = pThreadObj->pHead;
|
||||||
|
@ -542,6 +563,7 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
|
||||||
|
|
||||||
event.events = EPOLLIN | EPOLLRDHUP;
|
event.events = EPOLLIN | EPOLLRDHUP;
|
||||||
event.data.ptr = pFdObj;
|
event.data.ptr = pFdObj;
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
|
||||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||||
tfree(pFdObj);
|
tfree(pFdObj);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -136,12 +136,15 @@ void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
|
||||||
event.events = EPOLLIN | EPOLLRDHUP;
|
event.events = EPOLLIN | EPOLLRDHUP;
|
||||||
event.data.ptr = pConn;
|
event.data.ptr = pConn;
|
||||||
|
|
||||||
|
fprintf(stderr, ">>>>>>>>>>>>>>>>>\n");
|
||||||
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
||||||
|
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
|
||||||
sError("failed to add fd:%d since %s", connFd, strerror(errno));
|
sError("failed to add fd:%d since %s", connFd, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tfree(pConn);
|
tfree(pConn);
|
||||||
pConn = NULL;
|
pConn = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
|
||||||
pThread->numOfFds++;
|
pThread->numOfFds++;
|
||||||
sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
|
sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
|
||||||
}
|
}
|
||||||
|
@ -167,7 +170,9 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
|
||||||
(*pInfo->processBrokenLink)(pConn->handleId);
|
(*pInfo->processBrokenLink)(pConn->handleId);
|
||||||
|
|
||||||
pThread->numOfFds--;
|
pThread->numOfFds--;
|
||||||
|
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
|
||||||
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
|
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
|
||||||
|
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
|
||||||
sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
|
sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
|
||||||
taosClose(pConn->fd);
|
taosClose(pConn->fd);
|
||||||
tfree(pConn);
|
tfree(pConn);
|
||||||
|
@ -233,7 +238,11 @@ static void *syncProcessTcpData(void *param) {
|
||||||
|
|
||||||
sDebug("%p TCP epoll thread exits", pThread);
|
sDebug("%p TCP epoll thread exits", pThread);
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
epoll_close(pThread->pollFd);
|
||||||
|
#else
|
||||||
close(pThread->pollFd);
|
close(pThread->pollFd);
|
||||||
|
#endif
|
||||||
tfree(pThread);
|
tfree(pThread);
|
||||||
tfree(buffer);
|
tfree(buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -278,6 +287,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
|
||||||
|
|
||||||
pThread->pPool = pPool;
|
pThread->pPool = pPool;
|
||||||
pThread->pollFd = epoll_create(10); // size does not matter
|
pThread->pollFd = epoll_create(10); // size does not matter
|
||||||
|
fprintf(stderr, "...............\n");
|
||||||
if (pThread->pollFd < 0) {
|
if (pThread->pollFd < 0) {
|
||||||
tfree(pThread);
|
tfree(pThread);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -290,7 +300,11 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
#ifdef __APPLE__
|
||||||
|
epoll_close(pThread->pollFd);
|
||||||
|
#else
|
||||||
close(pThread->pollFd);
|
close(pThread->pollFd);
|
||||||
|
#endif
|
||||||
tfree(pThread);
|
tfree(pThread);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,11 @@ typedef struct {
|
||||||
SMemTable* mem;
|
SMemTable* mem;
|
||||||
SMemTable* imem;
|
SMemTable* imem;
|
||||||
STsdbFileH* tsdbFileH;
|
STsdbFileH* tsdbFileH;
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_t *readyToCommit;
|
||||||
|
#else
|
||||||
sem_t readyToCommit;
|
sem_t readyToCommit;
|
||||||
|
#endif
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
bool repoLocked;
|
bool repoLocked;
|
||||||
int32_t code; // Commit code
|
int32_t code; // Commit code
|
||||||
|
@ -616,4 +620,4 @@ int tsdbScheduleCommit(STsdbRepo *pRepo);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -166,7 +166,11 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
||||||
pRepo->imem = NULL;
|
pRepo->imem = NULL;
|
||||||
tsdbUnlockRepo(pRepo);
|
tsdbUnlockRepo(pRepo);
|
||||||
tsdbUnRefMemTable(pRepo, pIMem);
|
tsdbUnRefMemTable(pRepo, pIMem);
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_post(pRepo->readyToCommit);
|
||||||
|
#else
|
||||||
sem_post(&(pRepo->readyToCommit));
|
sem_post(&(pRepo->readyToCommit));
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||||
|
|
|
@ -146,7 +146,11 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
||||||
|
|
||||||
if (toCommit) {
|
if (toCommit) {
|
||||||
tsdbAsyncCommit(pRepo);
|
tsdbAsyncCommit(pRepo);
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_wait(pRepo->readyToCommit);
|
||||||
|
#else
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
|
#endif
|
||||||
terrno = pRepo->code;
|
terrno = pRepo->code;
|
||||||
}
|
}
|
||||||
tsdbUnRefMemTable(pRepo, pRepo->mem);
|
tsdbUnRefMemTable(pRepo, pRepo->mem);
|
||||||
|
@ -643,11 +647,19 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1);
|
||||||
|
if (!pRepo->readyToCommit) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
#else
|
||||||
code = sem_init(&(pRepo->readyToCommit), 0, 1);
|
code = sem_init(&(pRepo->readyToCommit), 0, 1);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
pRepo->repoLocked = false;
|
pRepo->repoLocked = false;
|
||||||
|
|
||||||
|
@ -693,7 +705,11 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
|
||||||
// tsdbFreeMemTable(pRepo->mem);
|
// tsdbFreeMemTable(pRepo->mem);
|
||||||
// tsdbFreeMemTable(pRepo->imem);
|
// tsdbFreeMemTable(pRepo->imem);
|
||||||
tfree(pRepo->rootDir);
|
tfree(pRepo->rootDir);
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_close(pRepo->readyToCommit);
|
||||||
|
#else
|
||||||
sem_destroy(&(pRepo->readyToCommit));
|
sem_destroy(&(pRepo->readyToCommit));
|
||||||
|
#endif
|
||||||
pthread_mutex_destroy(&pRepo->mutex);
|
pthread_mutex_destroy(&pRepo->mutex);
|
||||||
free(pRepo);
|
free(pRepo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,7 +207,11 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
if (pRepo->mem == NULL) return 0;
|
if (pRepo->mem == NULL) return 0;
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_wait(pRepo->readyToCommit);
|
||||||
|
#else
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
|
#endif
|
||||||
|
|
||||||
ASSERT(pRepo->imem == NULL);
|
ASSERT(pRepo->imem == NULL);
|
||||||
|
|
||||||
|
@ -229,8 +233,13 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
|
||||||
tsdbAsyncCommit(pRepo);
|
tsdbAsyncCommit(pRepo);
|
||||||
|
#ifdef __APPLE__
|
||||||
|
sem_wait(pRepo->readyToCommit);
|
||||||
|
sem_post(pRepo->readyToCommit);
|
||||||
|
#else
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
sem_post(&(pRepo->readyToCommit));
|
sem_post(&(pRepo->readyToCommit));
|
||||||
|
#endif
|
||||||
|
|
||||||
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = pRepo->code;
|
terrno = pRepo->code;
|
||||||
|
@ -927,4 +936,4 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
hints.ai_flags = AI_CANONNAME;
|
hints.ai_flags = AI_CANONNAME;
|
||||||
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
|
uError("failed to get fqdn for hostname <%s>, code:%d, reason:%s", hostname, ret, gai_strerror(ret));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,6 +341,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef __APPLE__
|
||||||
int32_t probes = 3;
|
int32_t probes = 3;
|
||||||
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
|
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
|
||||||
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
|
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
|
||||||
|
@ -361,6 +362,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
|
||||||
taosCloseSocket(sockFd);
|
taosCloseSocket(sockFd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t nodelay = 1;
|
int32_t nodelay = 1;
|
||||||
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
|
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
|
||||||
|
|
|
@ -6,3 +6,8 @@ IF (TD_LINUX)
|
||||||
ADD_EXECUTABLE(demo demo.c)
|
ADD_EXECUTABLE(demo demo.c)
|
||||||
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
|
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
IF (TD_DARWIN)
|
||||||
|
INCLUDE_DIRECTORIES(. ${TD_COMMUNITY_DIR}/src/inc ${TD_COMMUNITY_DIR}/src/client/inc ${TD_COMMUNITY_DIR}/inc)
|
||||||
|
ADD_EXECUTABLE(epoll epoll.c)
|
||||||
|
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread )
|
||||||
|
ENDIF ()
|
||||||
|
|
Loading…
Reference in New Issue