Merge remote-tracking branch 'vbox/mac' into mac
This commit is contained in:
commit
6edc669fd3
|
@ -295,6 +295,10 @@ void taos_close(TAOS *taos) {
|
||||||
|
|
||||||
tscDebug("%p HB is freed", pHb);
|
tscDebug("%p HB is freed", pHb);
|
||||||
taosReleaseRef(tscObjRef, pHb->self);
|
taosReleaseRef(tscObjRef, pHb->self);
|
||||||
|
#ifdef __APPLE__
|
||||||
|
// to satisfy later tsem_destroy in taos_free_result
|
||||||
|
tsem_init(&pHb->rspSem, 0, 0);
|
||||||
|
#endif // __APPLE__
|
||||||
taos_free_result(pHb);
|
taos_free_result(pHb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1933,6 +1933,10 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
|
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
|
||||||
|
#ifdef __APPLE__
|
||||||
|
// to satisfy later tsem_destroy in taos_free_result
|
||||||
|
tsem_init(&pNew->rspSem, 0, 0);
|
||||||
|
#endif // __APPLE__
|
||||||
tscFreeSqlObj(pNew);
|
tscFreeSqlObj(pNew);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2501,9 +2505,9 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
|
||||||
// set the sql object owner
|
// set the sql object owner
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
pthread_t threadId = (pthread_t)taosGetSelfPthreadId();
|
pthread_t threadId = (pthread_t)taosGetSelfPthreadId();
|
||||||
#else
|
#else // __APPLE__
|
||||||
uint64_t threadId = taosGetSelfPthreadId();
|
uint64_t threadId = taosGetSelfPthreadId();
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
||||||
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -164,7 +164,7 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
|
||||||
dInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
|
dInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
|
||||||
#else // __APPLE__
|
#else // __APPLE__
|
||||||
dInfo("shut down signal is %d, sender PID:%d cmdline:%s", signum, sigInfo->si_pid, taosGetCmdlineByPID(sigInfo->si_pid));
|
dInfo("shut down signal is %d, sender PID:%d cmdline:%s", signum, sigInfo->si_pid, taosGetCmdlineByPID(sigInfo->si_pid));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
// protect the application from receive another signal
|
// protect the application from receive another signal
|
||||||
struct sigaction act = {{0}};
|
struct sigaction act = {{0}};
|
||||||
|
|
|
@ -241,7 +241,7 @@ static int sem_timedwait(tsem_t *sem, struct timespec *to) {
|
||||||
fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__);
|
fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__);
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
static void* telemetryThread(void* param) {
|
static void* telemetryThread(void* param) {
|
||||||
struct timespec end = {0};
|
struct timespec end = {0};
|
||||||
|
|
|
@ -145,6 +145,7 @@ bool taosGetSystemUid(char *uid) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#endif // __APPLE__
|
#endif // __APPLE__
|
||||||
|
|
||||||
static int32_t mnodeCreateCluster() {
|
static int32_t mnodeCreateCluster() {
|
||||||
int32_t numOfClusters = sdbGetNumOfRows(tsClusterSdb);
|
int32_t numOfClusters = sdbGetNumOfRows(tsClusterSdb);
|
||||||
if (numOfClusters != 0) return TSDB_CODE_SUCCESS;
|
if (numOfClusters != 0) return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -75,11 +75,11 @@ extern "C" {
|
||||||
#define TAOS_OS_FUNC_FILE_SENDIFLE
|
#define TAOS_OS_FUNC_FILE_SENDIFLE
|
||||||
|
|
||||||
#define TAOS_OS_FUNC_SEMPHONE
|
#define TAOS_OS_FUNC_SEMPHONE
|
||||||
#define tsem_t dispatch_semaphore_t
|
typedef struct tsem_s *tsem_t;
|
||||||
int tsem_init(dispatch_semaphore_t *sem, int pshared, unsigned int value);
|
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
|
||||||
int tsem_wait(dispatch_semaphore_t *sem);
|
int tsem_wait(tsem_t *sem);
|
||||||
int tsem_post(dispatch_semaphore_t *sem);
|
int tsem_post(tsem_t *sem);
|
||||||
int tsem_destroy(dispatch_semaphore_t *sem);
|
int tsem_destroy(tsem_t *sem);
|
||||||
|
|
||||||
#define TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
|
#define TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
|
||||||
#define TAOS_OS_FUNC_STRING_STR2INT64
|
#define TAOS_OS_FUNC_STRING_STR2INT64
|
||||||
|
|
|
@ -90,11 +90,12 @@ extern "C" {
|
||||||
#ifdef _ISOC11_SOURCE
|
#ifdef _ISOC11_SOURCE
|
||||||
#define threadlocal _Thread_local
|
#define threadlocal _Thread_local
|
||||||
#elif defined(__APPLE__)
|
#elif defined(__APPLE__)
|
||||||
#define threadlocal
|
#define threadlocal __thread
|
||||||
#elif defined(__GNUC__) && !defined(threadlocal)
|
#elif defined(__GNUC__) && !defined(threadlocal)
|
||||||
#define threadlocal __thread
|
#define threadlocal __thread
|
||||||
#else
|
#else
|
||||||
#define threadlocal
|
// #define threadlocal
|
||||||
|
#error please follow with the target platform's thread-local implementation
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -13,28 +13,269 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// fail-fast or let-it-crash philosophy
|
||||||
|
// https://en.wikipedia.org/wiki/Fail-fast
|
||||||
|
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
|
||||||
|
// experimentally, we follow log-and-crash here
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
int tsem_init(dispatch_semaphore_t *sem, int pshared, unsigned int value) {
|
// #define SEM_USE_PTHREAD
|
||||||
*sem = dispatch_semaphore_create(value);
|
// #define SEM_USE_POSIX
|
||||||
if (*sem == NULL) {
|
#define SEM_USE_SEM
|
||||||
return -1;
|
|
||||||
} else {
|
#ifdef SEM_USE_SEM
|
||||||
return 0;
|
#include <mach/mach_init.h>
|
||||||
|
#include <mach/mach_error.h>
|
||||||
|
#include <mach/semaphore.h>
|
||||||
|
#include <mach/task.h>
|
||||||
|
|
||||||
|
static pthread_t sem_thread;
|
||||||
|
static pthread_once_t sem_once;
|
||||||
|
static task_t sem_port;
|
||||||
|
static volatile int sem_inited = 0;
|
||||||
|
static semaphore_t sem_exit;
|
||||||
|
|
||||||
|
static void* sem_thread_routine(void *arg) {
|
||||||
|
(void)arg;
|
||||||
|
sem_port = mach_task_self();
|
||||||
|
kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
|
||||||
|
if (ret != KERN_SUCCESS) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
sem_inited = -1;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
sem_inited = 1;
|
||||||
|
semaphore_wait(sem_exit);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void once_init(void) {
|
||||||
|
int r = 0;
|
||||||
|
r = pthread_create(&sem_thread, NULL, sem_thread_routine, NULL);
|
||||||
|
if (r) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while (sem_inited==0) {
|
||||||
|
;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
struct tsem_s {
|
||||||
|
#ifdef SEM_USE_PTHREAD
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
volatile int64_t val;
|
||||||
|
#elif defined(SEM_USE_POSIX)
|
||||||
|
size_t id;
|
||||||
|
sem_t *sem;
|
||||||
|
#elif defined(SEM_USE_SEM)
|
||||||
|
semaphore_t sem;
|
||||||
|
#else // SEM_USE_PTHREAD
|
||||||
|
dispatch_semaphore_t sem;
|
||||||
|
#endif // SEM_USE_PTHREAD
|
||||||
|
|
||||||
|
volatile unsigned int valid:1;
|
||||||
|
};
|
||||||
|
|
||||||
|
int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
|
||||||
|
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
if (*sem) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
struct tsem_s *p = (struct tsem_s*)calloc(1, sizeof(*p));
|
||||||
|
if (!p) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef SEM_USE_PTHREAD
|
||||||
|
int r = pthread_mutex_init(&p->lock, NULL);
|
||||||
|
do {
|
||||||
|
if (r) break;
|
||||||
|
r = pthread_cond_init(&p->cond, NULL);
|
||||||
|
if (r) {
|
||||||
|
pthread_mutex_destroy(&p->lock);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
p->val = value;
|
||||||
|
} while (0);
|
||||||
|
if (r) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#elif defined(SEM_USE_POSIX)
|
||||||
|
static size_t tick = 0;
|
||||||
|
do {
|
||||||
|
size_t id = atomic_add_fetch_64(&tick, 1);
|
||||||
|
if (id==SEM_VALUE_MAX) {
|
||||||
|
atomic_store_64(&tick, 0);
|
||||||
|
id = 0;
|
||||||
|
}
|
||||||
|
char name[NAME_MAX-4];
|
||||||
|
snprintf(name, sizeof(name), "/t%ld", id);
|
||||||
|
p->sem = sem_open(name, O_CREAT|O_EXCL, pshared, value);
|
||||||
|
p->id = id;
|
||||||
|
if (p->sem!=SEM_FAILED) break;
|
||||||
|
int e = errno;
|
||||||
|
if (e==EEXIST) continue;
|
||||||
|
if (e==EINTR) continue;
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e));
|
||||||
|
abort();
|
||||||
|
} while (p->sem==SEM_FAILED);
|
||||||
|
#elif defined(SEM_USE_SEM)
|
||||||
|
pthread_once(&sem_once, once_init);
|
||||||
|
if (sem_inited!=1) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
errno = ENOMEM;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, 0);
|
||||||
|
if (ret != KERN_SUCCESS) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
// we fail-fast here, because we have less-doc about semaphore_create for the moment
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#else // SEM_USE_PTHREAD
|
||||||
|
p->sem = dispatch_semaphore_create(value);
|
||||||
|
if (p->sem == NULL) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#endif // SEM_USE_PTHREAD
|
||||||
|
|
||||||
|
p->valid = 1;
|
||||||
|
|
||||||
|
*sem = p;
|
||||||
|
|
||||||
int tsem_wait(dispatch_semaphore_t *sem) {
|
|
||||||
dispatch_semaphore_wait(*sem, DISPATCH_TIME_FOREVER);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsem_post(dispatch_semaphore_t *sem) {
|
int tsem_wait(tsem_t *sem) {
|
||||||
dispatch_semaphore_signal(*sem);
|
if (!*sem) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
struct tsem_s *p = *sem;
|
||||||
|
if (!p->valid) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#ifdef SEM_USE_PTHREAD
|
||||||
|
if (pthread_mutex_lock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
p->val -= 1;
|
||||||
|
if (p->val < 0) {
|
||||||
|
if (pthread_cond_wait(&p->cond, &p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pthread_mutex_unlock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
#elif defined(SEM_USE_POSIX)
|
||||||
|
return sem_wait(p->sem);
|
||||||
|
#elif defined(SEM_USE_SEM)
|
||||||
|
return semaphore_wait(p->sem);
|
||||||
|
#else // SEM_USE_PTHREAD
|
||||||
|
return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
|
||||||
|
#endif // SEM_USE_PTHREAD
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsem_post(tsem_t *sem) {
|
||||||
|
if (!*sem) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
struct tsem_s *p = *sem;
|
||||||
|
if (!p->valid) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#ifdef SEM_USE_PTHREAD
|
||||||
|
if (pthread_mutex_lock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
p->val += 1;
|
||||||
|
if (p->val <= 0) {
|
||||||
|
if (pthread_cond_signal(&p->cond)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pthread_mutex_unlock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
#elif defined(SEM_USE_POSIX)
|
||||||
|
return sem_post(p->sem);
|
||||||
|
#elif defined(SEM_USE_SEM)
|
||||||
|
return semaphore_signal(p->sem);
|
||||||
|
#else // SEM_USE_PTHREAD
|
||||||
|
return dispatch_semaphore_signal(p->sem);
|
||||||
|
#endif // SEM_USE_PTHREAD
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsem_destroy(tsem_t *sem) {
|
||||||
|
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
if (!*sem) {
|
||||||
|
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
// abort();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
struct tsem_s *p = *sem;
|
||||||
|
if (!p->valid) {
|
||||||
|
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
// abort();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#ifdef SEM_USE_PTHREAD
|
||||||
|
if (pthread_mutex_lock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
p->valid = 0;
|
||||||
|
if (pthread_cond_destroy(&p->cond)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
if (pthread_mutex_unlock(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
if (pthread_mutex_destroy(&p->lock)) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#elif defined(SEM_USE_POSIX)
|
||||||
|
char name[NAME_MAX-4];
|
||||||
|
snprintf(name, sizeof(name), "/t%ld", p->id);
|
||||||
|
int r = sem_unlink(name);
|
||||||
|
if (r) {
|
||||||
|
int e = errno;
|
||||||
|
fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e));
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
#elif defined(SEM_USE_SEM)
|
||||||
|
semaphore_destroy(sem_port, p->sem);
|
||||||
|
#else // SEM_USE_PTHREAD
|
||||||
|
#endif // SEM_USE_PTHREAD
|
||||||
|
|
||||||
|
p->valid = 0;
|
||||||
|
free(p);
|
||||||
|
|
||||||
|
*sem = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsem_destroy(dispatch_semaphore_t *sem) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,9 +13,82 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// fail-fast or let-it-crash philosophy
|
||||||
|
// https://en.wikipedia.org/wiki/Fail-fast
|
||||||
|
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
|
||||||
|
// experimentally, we follow log-and-crash here
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
#include <sys/event.h>
|
||||||
|
|
||||||
|
static void (*timer_callback)(int);
|
||||||
|
static int timer_ms = 0;
|
||||||
|
static pthread_t timer_thread;
|
||||||
|
static int timer_kq = -1;
|
||||||
|
static volatile int timer_stop = 0;
|
||||||
|
|
||||||
|
static void* timer_routine(void *arg) {
|
||||||
|
(void)arg;
|
||||||
|
|
||||||
|
int r = 0;
|
||||||
|
struct timespec to = {0};
|
||||||
|
to.tv_sec = timer_ms / 1000;
|
||||||
|
to.tv_nsec = (timer_ms % 1000) * 1000000;
|
||||||
|
while (!timer_stop) {
|
||||||
|
struct kevent64_s kev[10] = {0};
|
||||||
|
r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to);
|
||||||
|
if (r!=0) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
timer_callback(SIGALRM); // just mock
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int taosInitTimer(void (*callback)(int), int ms) {
|
||||||
|
int r = 0;
|
||||||
|
timer_ms = ms;
|
||||||
|
timer_callback = callback;
|
||||||
|
|
||||||
|
timer_kq = kqueue();
|
||||||
|
if (timer_kq==-1) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
// since no caller of this func checks the return value for the moment
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
r = pthread_create(&timer_thread, NULL, timer_routine, NULL);
|
||||||
|
if (r) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==failed to create timer thread\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
// since no caller of this func checks the return value for the moment
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosUninitTimer() {
|
||||||
|
int r = 0;
|
||||||
|
timer_stop = 1;
|
||||||
|
r = pthread_join(timer_thread, NULL);
|
||||||
|
if (r) {
|
||||||
|
fprintf(stderr, "==%s[%d]%s()==failed to join timer thread\n", basename(__FILE__), __LINE__, __func__);
|
||||||
|
// since no caller of this func checks the return value for the moment
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
close(timer_kq);
|
||||||
|
timer_kq = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taos_block_sigalrm(void) {
|
||||||
|
// we don't know if there's any specific API for SIGALRM to deliver to specific thread
|
||||||
|
// this implementation relies on kqueue rather than SIGALRM
|
||||||
|
}
|
||||||
|
#else
|
||||||
int taosInitTimer(void (*callback)(int), int ms) {
|
int taosInitTimer(void (*callback)(int), int ms) {
|
||||||
signal(SIGALRM, callback);
|
signal(SIGALRM, callback);
|
||||||
|
|
||||||
|
@ -46,4 +119,5 @@ void taos_block_sigalrm(void) {
|
||||||
already_set = 1;
|
already_set = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -13,13 +13,18 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// fail-fast or let-it-crash philosophy
|
||||||
|
// https://en.wikipedia.org/wiki/Fail-fast
|
||||||
|
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
|
||||||
|
// experimentally, we follow log-and-crash here
|
||||||
|
|
||||||
#include "eok.h"
|
#include "eok.h"
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include <sys/event.h>
|
#include <sys/event.h>
|
||||||
|
|
||||||
#define LET_IT_BE
|
// #define BALANCE_CHECK_WHEN_CLOSE
|
||||||
|
|
||||||
#ifdef ENABLE_LOG
|
#ifdef ENABLE_LOG
|
||||||
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
|
||||||
|
@ -99,16 +104,16 @@ struct eok_event_s {
|
||||||
typedef struct eoks_s eoks_t;
|
typedef struct eoks_s eoks_t;
|
||||||
struct eoks_s {
|
struct eoks_s {
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
ep_over_kq_t **eoks; // note: this memory leaks when process terminates
|
ep_over_kq_t **eoks; // note: this memory leaks when process terminates
|
||||||
int neoks; // we can add an extra api to let user clean
|
int neoks; // we can add an extra api to let user clean
|
||||||
ep_over_kq_t *eoks_free; // currently, we just keep it simple stupid
|
ep_over_kq_t *eoks_free_list; // currently, we just keep it simple stupid
|
||||||
};
|
};
|
||||||
|
|
||||||
static eoks_t eoks = {
|
static eoks_t eoks = {
|
||||||
.lock = PTHREAD_MUTEX_INITIALIZER,
|
.lock = PTHREAD_MUTEX_INITIALIZER,
|
||||||
.eoks = NULL,
|
.eoks = NULL,
|
||||||
.neoks = 0,
|
.neoks = 0,
|
||||||
.eoks_free = NULL,
|
.eoks_free_list = NULL,
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef ENABLE_LOG
|
#ifdef ENABLE_LOG
|
||||||
|
@ -760,9 +765,9 @@ static ep_over_kq_t* eoks_alloc(void) {
|
||||||
|
|
||||||
A(0==pthread_mutex_lock(&eoks.lock), "");
|
A(0==pthread_mutex_lock(&eoks.lock), "");
|
||||||
do {
|
do {
|
||||||
if (eoks.eoks_free) {
|
if (eoks.eoks_free_list) {
|
||||||
eok = eoks.eoks_free;
|
eok = eoks.eoks_free_list;
|
||||||
eoks.eoks_free = eok->next;
|
eoks.eoks_free_list = eok->next;
|
||||||
A(eoks.eoks, "internal logic error");
|
A(eoks.eoks, "internal logic error");
|
||||||
A(eok->idx>=0 && eok->idx<eoks.neoks, "internal logic error");
|
A(eok->idx>=0 && eok->idx<eoks.neoks, "internal logic error");
|
||||||
A(*(eoks.eoks + eok->idx)==NULL, "internal logic error");
|
A(*(eoks.eoks + eok->idx)==NULL, "internal logic error");
|
||||||
|
@ -820,10 +825,12 @@ static void eoks_free(ep_over_kq_t *eok) {
|
||||||
|
|
||||||
A(eok->waiting==0, "internal logic error");
|
A(eok->waiting==0, "internal logic error");
|
||||||
eok_event_t *ev = eok->evs_head;
|
eok_event_t *ev = eok->evs_head;
|
||||||
|
int sv_closed = 0;
|
||||||
while (ev) {
|
while (ev) {
|
||||||
eok_event_t *next = ev->next;
|
eok_event_t *next = ev->next;
|
||||||
if (ev->fd==eok->sv[0]) {
|
if (ev->fd==eok->sv[0]) {
|
||||||
// fd is critical system resource
|
// fd is critical system resource
|
||||||
|
A(sv_closed==0, "internal logic error");
|
||||||
close(eok->sv[0]);
|
close(eok->sv[0]);
|
||||||
eok->sv[0] = -1;
|
eok->sv[0] = -1;
|
||||||
close(eok->sv[1]);
|
close(eok->sv[1]);
|
||||||
|
@ -832,11 +839,11 @@ static void eoks_free(ep_over_kq_t *eok) {
|
||||||
} else {
|
} else {
|
||||||
// user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close?
|
// user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close?
|
||||||
// calling close(ev->fd) here smells really bad
|
// calling close(ev->fd) here smells really bad
|
||||||
#ifdef LET_IT_BE
|
#ifndef BALANCE_CHECK_WHEN_CLOSE
|
||||||
// we just let it be and reclaim ev
|
// we just let it be and reclaim ev
|
||||||
eok_free_ev(eok, ev);
|
eok_free_ev(eok, ev);
|
||||||
#else
|
#else
|
||||||
// panic otherwise, if LET_IT_BE not defined
|
// panic otherwise, if BALANCE_CHECK_WHEN_CLOSE is defined
|
||||||
A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0,
|
A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0,
|
||||||
"epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?",
|
"epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?",
|
||||||
eok->idx, ev->fd);
|
eok->idx, ev->fd);
|
||||||
|
@ -861,8 +868,8 @@ static void eoks_free(ep_over_kq_t *eok) {
|
||||||
close(eok->kq);
|
close(eok->kq);
|
||||||
eok->kq = -1;
|
eok->kq = -1;
|
||||||
}
|
}
|
||||||
eok->next = eoks.eoks_free;
|
eok->next = eoks.eoks_free_list;
|
||||||
eoks.eoks_free = eok;
|
eoks.eoks_free_list = eok;
|
||||||
*(eoks.eoks + eok->idx) = NULL;
|
*(eoks.eoks + eok->idx) = NULL;
|
||||||
} while (0);
|
} while (0);
|
||||||
A(0==pthread_mutex_unlock(&eoks.lock), "");
|
A(0==pthread_mutex_unlock(&eoks.lock), "");
|
||||||
|
|
|
@ -33,7 +33,7 @@ static bool httpReadData(HttpContext *pContext);
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
static int sv_dummy = 0;
|
static int sv_dummy = 0;
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
static void httpStopThread(HttpThread* pThread) {
|
static void httpStopThread(HttpThread* pThread) {
|
||||||
pThread->stop = true;
|
pThread->stop = true;
|
||||||
|
|
|
@ -312,9 +312,9 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
epoll_close(pThreadObj->pollFd);
|
epoll_close(pThreadObj->pollFd);
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
}
|
}
|
||||||
#else
|
#else // __APPLE__
|
||||||
taosCloseSocket(pThreadObj->pollFd);
|
taosCloseSocket(pThreadObj->pollFd);
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
||||||
|
@ -477,7 +477,10 @@ static void *taosProcessTcpData(void *param) {
|
||||||
SFdObj *pFdObj;
|
SFdObj *pFdObj;
|
||||||
struct epoll_event events[maxEvents];
|
struct epoll_event events[maxEvents];
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
taos_block_sigalrm();
|
||||||
|
#endif // __APPLE__
|
||||||
while (1) {
|
while (1) {
|
||||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
|
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
|
||||||
if (pThreadObj->stop) {
|
if (pThreadObj->stop) {
|
||||||
|
@ -524,9 +527,9 @@ static void *taosProcessTcpData(void *param) {
|
||||||
epoll_close(pThreadObj->pollFd);
|
epoll_close(pThreadObj->pollFd);
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
}
|
}
|
||||||
#else
|
#else // __APPLE__
|
||||||
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
while (pThreadObj->pHead) {
|
||||||
SFdObj *pFdObj = pThreadObj->pHead;
|
SFdObj *pFdObj = pThreadObj->pHead;
|
||||||
|
|
|
@ -235,9 +235,9 @@ static void *syncProcessTcpData(void *param) {
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
epoll_close(pThread->pollFd);
|
epoll_close(pThread->pollFd);
|
||||||
#else
|
#else // __APPLE__
|
||||||
close(pThread->pollFd);
|
close(pThread->pollFd);
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
tfree(pThread);
|
tfree(pThread);
|
||||||
tfree(buffer);
|
tfree(buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -296,9 +296,9 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
epoll_close(pThread->pollFd);
|
epoll_close(pThread->pollFd);
|
||||||
#else
|
#else // __APPLE__
|
||||||
close(pThread->pollFd);
|
close(pThread->pollFd);
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
tfree(pThread);
|
tfree(pThread);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,9 +235,9 @@ typedef struct {
|
||||||
STsdbFileH* tsdbFileH;
|
STsdbFileH* tsdbFileH;
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_t *readyToCommit;
|
sem_t *readyToCommit;
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_t readyToCommit;
|
sem_t readyToCommit;
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
bool repoLocked;
|
bool repoLocked;
|
||||||
int32_t code; // Commit code
|
int32_t code; // Commit code
|
||||||
|
|
|
@ -168,9 +168,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
||||||
tsdbUnRefMemTable(pRepo, pIMem);
|
tsdbUnRefMemTable(pRepo, pIMem);
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_post(pRepo->readyToCommit);
|
sem_post(pRepo->readyToCommit);
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_post(&(pRepo->readyToCommit));
|
sem_post(&(pRepo->readyToCommit));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||||
|
|
|
@ -148,9 +148,9 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
||||||
tsdbAsyncCommit(pRepo);
|
tsdbAsyncCommit(pRepo);
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_wait(pRepo->readyToCommit);
|
sem_wait(pRepo->readyToCommit);
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
terrno = pRepo->code;
|
terrno = pRepo->code;
|
||||||
}
|
}
|
||||||
tsdbUnRefMemTable(pRepo, pRepo->mem);
|
tsdbUnRefMemTable(pRepo, pRepo->mem);
|
||||||
|
@ -654,14 +654,14 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
#else
|
#else // __APPLE__
|
||||||
code = sem_init(&(pRepo->readyToCommit), 0, 1);
|
code = sem_init(&(pRepo->readyToCommit), 0, 1);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = errno;
|
code = errno;
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
pRepo->repoLocked = false;
|
pRepo->repoLocked = false;
|
||||||
|
|
||||||
|
@ -709,9 +709,9 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
|
||||||
tfree(pRepo->rootDir);
|
tfree(pRepo->rootDir);
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_close(pRepo->readyToCommit);
|
sem_close(pRepo->readyToCommit);
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_destroy(&(pRepo->readyToCommit));
|
sem_destroy(&(pRepo->readyToCommit));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
pthread_mutex_destroy(&pRepo->mutex);
|
pthread_mutex_destroy(&pRepo->mutex);
|
||||||
free(pRepo);
|
free(pRepo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,9 +209,9 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_wait(pRepo->readyToCommit);
|
sem_wait(pRepo->readyToCommit);
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
ASSERT(pRepo->imem == NULL);
|
ASSERT(pRepo->imem == NULL);
|
||||||
|
|
||||||
|
@ -236,10 +236,10 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) {
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
sem_wait(pRepo->readyToCommit);
|
sem_wait(pRepo->readyToCommit);
|
||||||
sem_post(pRepo->readyToCommit);
|
sem_post(pRepo->readyToCommit);
|
||||||
#else
|
#else // __APPLE__
|
||||||
sem_wait(&(pRepo->readyToCommit));
|
sem_wait(&(pRepo->readyToCommit));
|
||||||
sem_post(&(pRepo->readyToCommit));
|
sem_post(&(pRepo->readyToCommit));
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
if (pRepo->code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = pRepo->code;
|
terrno = pRepo->code;
|
||||||
|
|
|
@ -376,7 +376,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
|
||||||
taosCloseSocket(sockFd);
|
taosCloseSocket(sockFd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
|
|
||||||
int32_t nodelay = 1;
|
int32_t nodelay = 1;
|
||||||
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
|
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
|
||||||
|
|
|
@ -15,9 +15,9 @@
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
#include "eok.h"
|
#include "eok.h"
|
||||||
#else
|
#else // __APPLE__
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
#endif
|
#endif // __APPLE__
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
|
Loading…
Reference in New Issue