diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index a8ceaa848f..4d1701d848 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -269,13 +269,14 @@ public Connection getConn() throws Exception{ 用户可以在源代码的src/connector/python文件夹下找到python2和python3的安装包。用户可以通过pip命令安装: -​ `pip install src/connector/python/python2/` +​ `pip install src/connector/python/[linux|windows]/python2/` 或 -​ `pip install src/connector/python/python3/` +​ `pip install src/connector/python/[linux|windows]/python3/` 如果机器上没有pip命令,用户可将src/connector/python/python3或src/connector/python/python2下的taos文件夹拷贝到应用程序的目录使用。 +对于windows 客户端,安装TDengine windows 客户端后,将C:\TDengine\driver\taos.dll拷贝到C:\windows\system32目录下即可。所有TDengine的连接器,均需依赖taos.dll。 ### Python客户端接口 diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index a19579b7d4..d958679544 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -61,13 +61,13 @@ jmethodID g_rowdataSetByteArrayFp; void jniGetGlobalMethod(JNIEnv *env) { // make sure init function executed once - switch (__sync_val_compare_and_swap_32(&__init, 0, 1)) { + switch (atomic_val_compare_exchange_32(&__init, 0, 1)) { case 0: break; case 1: do { taosMsleep(0); - } while (__sync_val_load_32(&__init) == 1); + } while (atomic_load_32(&__init) == 1); case 2: return; } @@ -107,7 +107,7 @@ void jniGetGlobalMethod(JNIEnv *env) { g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V"); (*env)->DeleteLocalRef(env, rowdataClass); - __sync_val_restore_32(&__init, 2); + atomic_store_32(&__init, 2); jniTrace("native method register finished"); } diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index dfeebc8c61..24d8ad902a 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -353,7 +353,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) { } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { - if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { pSqlObj->res.code = abs(pSupporter->pState->code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); @@ -412,7 +412,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taos_fetch_rows_a(tres, joinRetrieveCallback, param); } else if (numOfRows == 0) { // no data from this vnode anymore - if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, @@ -451,7 +451,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); } - if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code); if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { pParentSql->res.code = abs(pSupporter->pState->code); @@ -560,7 +560,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; - // if (__sync_add_and_fetch_32(pSupporter->numOfComplete, 1) >= + // if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >= // pSupporter->numOfTotal) { // SSqlObj *pParentObj = pSupporter->pObj; // @@ -605,7 +605,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { quitAllSubquery(pParentSql, pSupporter); } else { - if (__sync_add_and_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { tscSetupOutputColumnIndex(pParentSql); if (pParentSql->fp == NULL) { diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 398cd1cca6..c1fe0c8674 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -432,11 +432,10 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; // there is no more result, so we release all allocated resource - SLocalReducer *pLocalReducer = - (SLocalReducer *)__sync_val_compare_and_swap_64(&pRes->pLocalReducer, pRes->pLocalReducer, 0); + SLocalReducer *pLocalReducer = (SLocalReducer*)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); if (pLocalReducer != NULL) { int32_t status = 0; - while ((status = __sync_val_compare_and_swap_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, + while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) { taosMsleep(100); tscTrace("%p waiting for delete procedure, status: %d", pSql, status); @@ -1328,7 +1327,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) { // set the data merge in progress int32_t prevStatus = - __sync_val_compare_and_swap_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); + atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) { assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e5fc8fbb4e..3dbf6596f1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1030,13 +1030,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq tscProcessSql(pNew); return; } else { // reach the maximum retry count, abort - __sync_val_compare_and_swap_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); + atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, numOfRows, idx, trsupport->pState->code); } } - if (__sync_add_and_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { + if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { return tscFreeSubSqlObj(trsupport, pSql); } @@ -1102,7 +1102,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); - __sync_add_and_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows); + atomic_add_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows); tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, pRes->numOfRows, trsupport->pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); @@ -1161,7 +1161,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); } - if (__sync_add_and_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { + if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { return tscFreeSubSqlObj(trsupport, pSql); } @@ -1290,7 +1290,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (code != TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code); - __sync_val_compare_and_swap_32(&trsupport->pState->code, 0, code); + atomic_val_compare_exchange_32(&trsupport->pState->code, 0, code); } else { // does not reach the maximum retry count, go on tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index ec8233c6f9..c9809be1e3 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -546,7 +546,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p void taos_close_stream(TAOS_STREAM *handle) { SSqlStream *pStream = (SSqlStream *)handle; - SSqlObj *pSql = (SSqlObj *)__sync_val_compare_and_swap_64(&pStream->pSql, pStream->pSql, 0); + SSqlObj *pSql = (SSqlObj *)atomic_exchange_ptr(&pStream->pSql, 0); if (pSql == NULL) { return; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 5e383c9d0b..2842754e40 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -182,51 +182,45 @@ void taos_init_imp() { void taos_init() { pthread_once(&tscinit, taos_init_imp); } -int taos_options(TSDB_OPTION option, const void *arg, ...) { - char * pStr = NULL; - SGlobalConfig *cfg_configDir = tsGetConfigOption("configDir"); - SGlobalConfig *cfg_activetimer = tsGetConfigOption("shellActivityTimer"); - SGlobalConfig *cfg_locale = tsGetConfigOption("locale"); - SGlobalConfig *cfg_charset = tsGetConfigOption("charset"); - SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); - SGlobalConfig *cfg_socket = tsGetConfigOption("sockettype"); +static int taos_options_imp(TSDB_OPTION option, const char *pStr) { + SGlobalConfig *cfg = NULL; switch (option) { case TSDB_OPTION_CONFIGDIR: - pStr = (char *)arg; - if (cfg_configDir && cfg_configDir->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + cfg = tsGetConfigOption("configDir"); + if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { strncpy(configDir, pStr, TSDB_FILENAME_LEN); - cfg_configDir->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; tscPrint("set config file directory:%s", pStr); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_configDir->option, pStr, - tsCfgStatusStr[cfg_configDir->cfgStatus], (char *)cfg_configDir->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; case TSDB_OPTION_SHELL_ACTIVITY_TIMER: - if (cfg_activetimer && cfg_activetimer->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { - tsShellActivityTimer = atoi((char *)arg); + cfg = tsGetConfigOption("shellActivityTimer"); + if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + tsShellActivityTimer = atoi(pStr); if (tsShellActivityTimer < 1) tsShellActivityTimer = 1; if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600; - cfg_activetimer->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; tscPrint("set shellActivityTimer:%d", tsShellActivityTimer); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg_activetimer->option, pStr, - tsCfgStatusStr[cfg_activetimer->cfgStatus], (int32_t *)cfg_activetimer->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr, + tsCfgStatusStr[cfg->cfgStatus], (int32_t *)cfg->ptr); } break; case TSDB_OPTION_LOCALE: { // set locale - pStr = (char *)arg; - + cfg = tsGetConfigOption("locale"); size_t len = strlen(pStr); if (len == 0 || len > TSDB_LOCALE_LEN) { tscPrint("Invalid locale:%s, use default", pStr); return -1; } - if (cfg_locale && cfg_charset && cfg_locale->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + if (cfg && cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { char sep = '.'; if (strlen(tsLocale) == 0) { // locale does not set yet @@ -239,7 +233,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { if (locale != NULL) { tscPrint("locale set, prev locale:%s, new locale:%s", tsLocale, locale); - cfg_locale->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; } else { // set the user-specified localed failed, use default LC_CTYPE as current locale locale = setlocale(LC_CTYPE, tsLocale); tscPrint("failed to set locale:%s, current locale:%s", pStr, tsLocale); @@ -261,7 +255,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { } strncpy(tsCharset, charset, tListLen(tsCharset)); - cfg_charset->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; } else { tscPrint("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset); @@ -272,23 +266,22 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { tscPrint("charset remains:%s", tsCharset); } } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_locale->option, pStr, - tsCfgStatusStr[cfg_locale->cfgStatus], (char *)cfg_locale->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; } case TSDB_OPTION_CHARSET: { /* set charset will override the value of charset, assigned during system locale changed */ - pStr = (char *)arg; - + cfg = tsGetConfigOption("charset"); size_t len = strlen(pStr); if (len == 0 || len > TSDB_LOCALE_LEN) { tscPrint("failed to set charset:%s", pStr); return -1; } - if (cfg_charset && cfg_charset->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (taosValidateEncodec(pStr)) { if (strlen(tsCharset) == 0) { tscPrint("charset is set:%s", pStr); @@ -297,40 +290,41 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { } strncpy(tsCharset, pStr, tListLen(tsCharset)); - cfg_charset->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; } else { tscPrint("charset:%s not valid", pStr); } } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_charset->option, pStr, - tsCfgStatusStr[cfg_charset->cfgStatus], (char *)cfg_charset->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; } case TSDB_OPTION_TIMEZONE: - pStr = (char *)arg; - if (cfg_timezone && cfg_timezone->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + cfg = tsGetConfigOption("timezone"); + if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { strcpy(tsTimezone, pStr); tsSetTimeZone(); - cfg_timezone->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; tscTrace("timezone set:%s, input:%s by taos_options", tsTimezone, pStr); } else { - tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg_timezone->option, pStr, - tsCfgStatusStr[cfg_timezone->cfgStatus], (char *)cfg_timezone->ptr); + tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr, + tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr); } break; case TSDB_OPTION_SOCKET_TYPE: - if (cfg_socket && cfg_socket->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { - if (strcasecmp(arg, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(arg, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { + cfg = tsGetConfigOption("sockettype"); + if (cfg && cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { + if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); return -1; } - strncpy(tsSocketType, arg, tListLen(tsSocketType)); - cfg_socket->cfgStatus = TSDB_CFG_CSTATUS_OPTION; + strncpy(tsSocketType, pStr, tListLen(tsSocketType)); + cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; tscPrint("socket type is set:%s", tsSocketType); } break; @@ -342,3 +336,20 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { return 0; } + + +int taos_options(TSDB_OPTION option, const void *arg, ...) { + static int32_t lock = 0; + + for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) { + if (i % 1000 == 0) { + tscPrint("haven't acquire lock after spin %d times.", i); + sched_yield(); + } + } + + int ret = taos_options_imp(option, (const char*)arg); + + atomic_store_32(&lock, 0); + return ret; +} \ No newline at end of file diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index 026d48ba08..09ced23c91 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -247,7 +247,7 @@ typedef struct { extern SGlobalConfig *tsGlobalConfig; extern int tsGlobalConfigNum; extern char * tsCfgStatusStr[]; -SGlobalConfig *tsGetConfigOption(char *option); +SGlobalConfig *tsGetConfigOption(const char *option); #define TSDB_CFG_MAX_NUM 110 #define TSDB_CFG_PRINT_LEN 23 diff --git a/src/modules/http/src/httpServer.c b/src/modules/http/src/httpServer.c index 8b981e0f84..49ff3562bf 100644 --- a/src/modules/http/src/httpServer.c +++ b/src/modules/http/src/httpServer.c @@ -72,7 +72,7 @@ void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) { } bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) { - return (__sync_val_compare_and_swap_32(&pContext->state, srcState, destState) == srcState); + return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); } void httpFreeContext(HttpServer *pServer, HttpContext *pContext); @@ -124,7 +124,7 @@ void httpCleanUpContextTimer(HttpContext *pContext) { void httpCleanUpContext(HttpContext *pContext) { httpTrace("context:%p, start the clean up operation", pContext); - __sync_val_compare_and_swap_64(&pContext->signature, pContext, 0); + atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0); if (pContext->signature != NULL) { httpTrace("context:%p is freed by another thread.", pContext); return; @@ -494,7 +494,7 @@ void httpProcessHttpData(void *param) { } else { if (httpReadData(pThread, pContext)) { (*(pThread->processData))(pContext); - __sync_fetch_and_add(&pThread->pServer->requestNum, 1); + atomic_fetch_add_32(&pThread->pServer->requestNum, 1); } } } diff --git a/src/modules/http/src/httpSystem.c b/src/modules/http/src/httpSystem.c index ece7c17814..df49251f13 100644 --- a/src/modules/http/src/httpSystem.c +++ b/src/modules/http/src/httpSystem.c @@ -77,7 +77,7 @@ int httpStartSystem() { if (httpServer == NULL) { httpError("http server is null"); - return -1; + httpInitSystem(); } if (httpServer->pContextPool == NULL) { @@ -148,7 +148,7 @@ void httpCleanUpSystem() { void httpGetReqCount(int32_t *httpReqestNum) { if (httpServer != NULL) { - *httpReqestNum = __sync_fetch_and_and(&httpServer->requestNum, 0); + *httpReqestNum = atomic_exchange_32(&httpServer->requestNum, 0); } else { *httpReqestNum = 0; } diff --git a/src/modules/monitor/src/monitorSystem.c b/src/modules/monitor/src/monitorSystem.c index 78cee40f96..4d6577c8f3 100644 --- a/src/modules/monitor/src/monitorSystem.c +++ b/src/modules/monitor/src/monitorSystem.c @@ -95,6 +95,9 @@ int monitorInitSystem() { } int monitorStartSystem() { + if (monitor == NULL) { + monitorInitSystem(); + } taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &monitor->initTimer); return 0; } diff --git a/src/os/darwin/inc/os.h b/src/os/darwin/inc/os.h index 83a56483b8..eabd5cd221 100644 --- a/src/os/darwin/inc/os.h +++ b/src/os/darwin/inc/os.h @@ -73,28 +73,71 @@ #define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) -// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler -// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. -#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap +#define atomic_val_compare_exchange_8 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_16 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_32 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_64 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap -#define __sync_add_and_fetch_64 __sync_add_and_fetch -#define __sync_add_and_fetch_32 __sync_add_and_fetch -#define __sync_add_and_fetch_16 __sync_add_and_fetch -#define __sync_add_and_fetch_8 __sync_add_and_fetch -#define __sync_add_and_fetch_ptr __sync_add_and_fetch +#define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) -#define __sync_sub_and_fetch_64 __sync_sub_and_fetch -#define __sync_sub_and_fetch_32 __sync_sub_and_fetch -#define __sync_sub_and_fetch_16 __sync_sub_and_fetch -#define __sync_sub_and_fetch_8 __sync_sub_and_fetch -#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch +#define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) -int32_t __sync_val_load_32(int32_t *ptr); -void __sync_val_restore_32(int32_t *ptr, int32_t newval); +#define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) #define SWAP(a, b, c) \ do { \ diff --git a/src/os/darwin/src/tdarwin.c b/src/os/darwin/src/tdarwin.c index 2f97c7c376..eec252d660 100644 --- a/src/os/darwin/src/tdarwin.c +++ b/src/os/darwin/src/tdarwin.c @@ -416,11 +416,3 @@ int tsem_post(dispatch_semaphore_t *sem) { int tsem_destroy(dispatch_semaphore_t *sem) { return 0; } - -int32_t __sync_val_load_32(int32_t *ptr) { - return __atomic_load_n(ptr, __ATOMIC_ACQUIRE); -} - -void __sync_val_restore_32(int32_t *ptr, int32_t newval) { - __atomic_store_n(ptr, newval, __ATOMIC_RELEASE); -} \ No newline at end of file diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 5b44836ebe..8055777622 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -92,28 +92,71 @@ extern "C" { #define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) #define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) -// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler -// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. -#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap -#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap +#define atomic_val_compare_exchange_8 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_16 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_32 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_64 __sync_val_compare_and_swap +#define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap -#define __sync_add_and_fetch_64 __sync_add_and_fetch -#define __sync_add_and_fetch_32 __sync_add_and_fetch -#define __sync_add_and_fetch_16 __sync_add_and_fetch -#define __sync_add_and_fetch_8 __sync_add_and_fetch -#define __sync_add_and_fetch_ptr __sync_add_and_fetch +#define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST) -#define __sync_sub_and_fetch_64 __sync_sub_and_fetch -#define __sync_sub_and_fetch_32 __sync_sub_and_fetch -#define __sync_sub_and_fetch_16 __sync_sub_and_fetch -#define __sync_sub_and_fetch_8 __sync_sub_and_fetch -#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch +#define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST) -int32_t __sync_val_load_32(int32_t *ptr); -void __sync_val_restore_32(int32_t *ptr, int32_t newval); +#define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST) #define SWAP(a, b, c) \ do { \ diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index b5271006e2..f78250cb52 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -340,11 +340,3 @@ bool taosSkipSocketCheck() { return false; } - -int32_t __sync_val_load_32(int32_t *ptr) { - return __atomic_load_n(ptr, __ATOMIC_ACQUIRE); -} - -void __sync_val_restore_32(int32_t *ptr, int32_t newval) { - __atomic_store_n(ptr, newval, __ATOMIC_RELEASE); -} diff --git a/src/os/windows/inc/os.h b/src/os/windows/inc/os.h index 3f61ff12b5..a1a4bdfa5c 100644 --- a/src/os/windows/inc/os.h +++ b/src/os/windows/inc/os.h @@ -81,6 +81,10 @@ extern "C" { #if defined(_M_ARM) || defined(_M_ARM64) +/* the '__iso_volatile' functions does not use a memory fence, so these + * definitions are incorrect, comment out as we don't support Windows on + * ARM at present. + #define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr)) #define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr)) #define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr)) @@ -98,7 +102,7 @@ extern "C" { #define atomic_load_ptr atomic_load_32 #define atomic_store_ptr atomic_store_32 #endif - +*/ #else #define atomic_load_8(ptr) (*(char volatile*)(ptr)) @@ -121,35 +125,152 @@ extern "C" { #define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val)) #define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val)) -#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval)) -#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval)) -#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval)) -#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval)) -#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval)) +#define atomic_val_compare_exchange_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval)) +#define atomic_val_compare_exchange_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval)) +#define atomic_val_compare_exchange_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval)) +#define atomic_val_compare_exchange_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval)) +#define atomic_val_compare_exchange_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval)) -char interlocked_add_8(char volatile *ptr, char val); -short interlocked_add_16(short volatile *ptr, short val); -long interlocked_add_32(long volatile *ptr, long val); -__int64 interlocked_add_64(__int64 volatile *ptr, __int64 val); +char interlocked_add_fetch_8(char volatile *ptr, char val); +short interlocked_add_fetch_16(short volatile *ptr, short val); +long interlocked_add_fetch_32(long volatile *ptr, long val); +__int64 interlocked_add_fetch_64(__int64 volatile *ptr, __int64 val); -#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val)) -#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val)) -#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val)) -#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val)) +#define atomic_add_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), (char)(val)) +#define atomic_add_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), (short)(val)) +#define atomic_add_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), (long)(val)) +#define atomic_add_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), (__int64)(val)) #ifdef _WIN64 - #define __sync_add_and_fetch_ptr __sync_add_and_fetch_64 + #define atomic_add_fetch_ptr atomic_add_fetch_64 #else - #define __sync_add_and_fetch_ptr __sync_add_and_fetch_32 + #define atomic_add_fetch_ptr atomic_add_fetch_32 #endif -#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val)) -#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val)) -#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val)) -#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val)) -#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val)) +#define atomic_fetch_add_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), (char)(val)) +#define atomic_fetch_add_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), (short)(val)) +#define atomic_fetch_add_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), (long)(val)) +#define atomic_fetch_add_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), (__int64)(val)) +#ifdef _WIN64 + #define atomic_fetch_add_ptr atomic_fetch_add_64 +#else + #define atomic_fetch_add_ptr atomic_fetch_add_32 +#endif -int32_t __sync_val_load_32(int32_t *ptr); -void __sync_val_restore_32(int32_t *ptr, int32_t newval); +#define atomic_sub_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), -(char)(val)) +#define atomic_sub_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), -(short)(val)) +#define atomic_sub_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), -(long)(val)) +#define atomic_sub_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), -(__int64)(val)) +#ifdef _WIN64 + #define atomic_sub_fetch_ptr atomic_sub_fetch_64 +#else + #define atomic_sub_fetch_ptr atomic_sub_fetch_32 +#endif + +#define atomic_fetch_sub_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), -(char)(val)) +#define atomic_fetch_sub_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), -(short)(val)) +#define atomic_fetch_sub_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), -(long)(val)) +#define atomic_fetch_sub_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), -(__int64)(val)) +#ifdef _WIN64 + #define atomic_fetch_sub_ptr atomic_fetch_sub_64 +#else + #define atomic_fetch_sub_ptr atomic_fetch_sub_32 +#endif + +char interlocked_and_fetch_8(char volatile* ptr, char val); +short interlocked_and_fetch_16(short volatile* ptr, short val); +long interlocked_and_fetch_32(long volatile* ptr, long val); +__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val); + +#define atomic_and_fetch_8(ptr, val) interlocked_and_fetch_8((char volatile*)(ptr), (char)(val)) +#define atomic_and_fetch_16(ptr, val) interlocked_and_fetch_16((short volatile*)(ptr), (short)(val)) +#define atomic_and_fetch_32(ptr, val) interlocked_and_fetch_32((long volatile*)(ptr), (long)(val)) +#define atomic_and_fetch_64(ptr, val) interlocked_and_fetch_64((__int64 volatile*)(ptr), (__int64)(val)) +#ifdef _WIN64 + #define atomic_and_fetch_ptr atomic_and_fetch_64 +#else + #define atomic_and_fetch_ptr atomic_and_fetch_32 +#endif + +#define atomic_fetch_and_8(ptr, val) _InterlockedAnd8((char volatile*)(ptr), (char)(val)) +#define atomic_fetch_and_16(ptr, val) _InterlockedAnd16((short volatile*)(ptr), (short)(val)) +#define atomic_fetch_and_32(ptr, val) _InterlockedAnd((long volatile*)(ptr), (long)(val)) + +#ifdef _M_IX86 + __int64 interlocked_fetch_and_64(__int64 volatile* ptr, __int64 val); + #define atomic_fetch_and_64(ptr, val) interlocked_fetch_and_64((__int64 volatile*)(ptr), (__int64)(val)) +#else + #define atomic_fetch_and_64(ptr, val) _InterlockedAnd64((__int64 volatile*)(ptr), (__int64)(val)) +#endif + +#ifdef _WIN64 + #define atomic_fetch_and_ptr atomic_fetch_and_64 +#else + #define atomic_fetch_and_ptr atomic_fetch_and_32 +#endif + +char interlocked_or_fetch_8(char volatile* ptr, char val); +short interlocked_or_fetch_16(short volatile* ptr, short val); +long interlocked_or_fetch_32(long volatile* ptr, long val); +__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val); + +#define atomic_or_fetch_8(ptr, val) interlocked_or_fetch_8((char volatile*)(ptr), (char)(val)) +#define atomic_or_fetch_16(ptr, val) interlocked_or_fetch_16((short volatile*)(ptr), (short)(val)) +#define atomic_or_fetch_32(ptr, val) interlocked_or_fetch_32((long volatile*)(ptr), (long)(val)) +#define atomic_or_fetch_64(ptr, val) interlocked_or_fetch_64((__int64 volatile*)(ptr), (__int64)(val)) +#ifdef _WIN64 + #define atomic_or_fetch_ptr atomic_or_fetch_64 +#else + #define atomic_or_fetch_ptr atomic_or_fetch_32 +#endif + +#define atomic_fetch_or_8(ptr, val) _InterlockedOr8((char volatile*)(ptr), (char)(val)) +#define atomic_fetch_or_16(ptr, val) _InterlockedOr16((short volatile*)(ptr), (short)(val)) +#define atomic_fetch_or_32(ptr, val) _InterlockedOr((long volatile*)(ptr), (long)(val)) + +#ifdef _M_IX86 + __int64 interlocked_fetch_or_64(__int64 volatile* ptr, __int64 val); + #define atomic_fetch_or_64(ptr, val) interlocked_fetch_or_64((__int64 volatile*)(ptr), (__int64)(val)) +#else + #define atomic_fetch_or_64(ptr, val) _InterlockedOr64((__int64 volatile*)(ptr), (__int64)(val)) +#endif + +#ifdef _WIN64 + #define atomic_fetch_or_ptr atomic_fetch_or_64 +#else + #define atomic_fetch_or_ptr atomic_fetch_or_32 +#endif + +char interlocked_xor_fetch_8(char volatile* ptr, char val); +short interlocked_xor_fetch_16(short volatile* ptr, short val); +long interlocked_xor_fetch_32(long volatile* ptr, long val); +__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val); + +#define atomic_xor_fetch_8(ptr, val) interlocked_xor_fetch_8((char volatile*)(ptr), (char)(val)) +#define atomic_xor_fetch_16(ptr, val) interlocked_xor_fetch_16((short volatile*)(ptr), (short)(val)) +#define atomic_xor_fetch_32(ptr, val) interlocked_xor_fetch_32((long volatile*)(ptr), (long)(val)) +#define atomic_xor_fetch_64(ptr, val) interlocked_xor_fetch_64((__int64 volatile*)(ptr), (__int64)(val)) +#ifdef _WIN64 + #define atomic_xor_fetch_ptr atomic_xor_fetch_64 +#else + #define atomic_xor_fetch_ptr atomic_xor_fetch_32 +#endif + +#define atomic_fetch_xor_8(ptr, val) _InterlockedXor8((char volatile*)(ptr), (char)(val)) +#define atomic_fetch_xor_16(ptr, val) _InterlockedXor16((short volatile*)(ptr), (short)(val)) +#define atomic_fetch_xor_32(ptr, val) _InterlockedXor((long volatile*)(ptr), (long)(val)) + +#ifdef _M_IX86 + __int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val); + #define atomic_fetch_xor_64(ptr, val) interlocked_fetch_xor_64((__int64 volatile*)(ptr), (__int64)(val)) +#else + #define atomic_fetch_xor_64(ptr, val) _InterlockedXor64((__int64 volatile*)(ptr), (__int64)(val)) +#endif + +#ifdef _WIN64 + #define atomic_fetch_xor_ptr atomic_fetch_xor_64 +#else + #define atomic_fetch_xor_ptr atomic_fetch_xor_32 +#endif #define SWAP(a, b, c) \ do { \ diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 106cb903b1..6c10ced6a2 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -66,31 +66,143 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle return setsockopt(socketfd, level, optname, optval, optlen); } - -char interlocked_add_8(char volatile* ptr, char val) { +// add +char interlocked_add_fetch_8(char volatile* ptr, char val) { return _InterlockedExchangeAdd8(ptr, val) + val; } -short interlocked_add_16(short volatile* ptr, short val) { +short interlocked_add_fetch_16(short volatile* ptr, short val) { return _InterlockedExchangeAdd16(ptr, val) + val; } -long interlocked_add_32(long volatile* ptr, long val) { +long interlocked_add_fetch_32(long volatile* ptr, long val) { return _InterlockedExchangeAdd(ptr, val) + val; } -__int64 interlocked_add_64(__int64 volatile* ptr, __int64 val) { +__int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) { return _InterlockedExchangeAdd64(ptr, val) + val; } -int32_t __sync_val_load_32(int32_t *ptr) { - return InterlockedOr(ptr, 0); +// and +char interlocked_and_fetch_8(char volatile* ptr, char val) { + return _InterlockedAnd8(ptr, val) & val; } -void __sync_val_restore_32(int32_t *ptr, int32_t newval) { - InterlockedCompareExchange(ptr, *ptr, newval); +short interlocked_and_fetch_16(short volatile* ptr, short val) { + return _InterlockedAnd16(ptr, val) & val; } +long interlocked_and_fetch_32(long volatile* ptr, long val) { + return _InterlockedAnd(ptr, val) & val; +} + +#ifndef _M_IX86 + +__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val) { + return _InterlockedAnd64(ptr, val) & val; +} + +#else + +__int64 interlocked_and_fetch_64(__int64 volatile* ptr, __int64 val) { + __int64 old, res; + do { + old = *ptr; + res = old & val; + } while(_InterlockedCompareExchange64(ptr, res, old) != old); + return res; +} + +__int64 interlocked_fetch_and_64(__int64 volatile* ptr, __int64 val) { + __int64 old; + do { + old = *ptr; + } while(_InterlockedCompareExchange64(ptr, old & val, old) != old); + return old; +} + +#endif + +// or +char interlocked_or_fetch_8(char volatile* ptr, char val) { + return _InterlockedOr8(ptr, val) | val; +} + +short interlocked_or_fetch_16(short volatile* ptr, short val) { + return _InterlockedOr16(ptr, val) | val; +} + +long interlocked_or_fetch_32(long volatile* ptr, long val) { + return _InterlockedOr(ptr, val) | val; +} + +#ifndef _M_IX86 + +__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val) { + return _InterlockedOr64(ptr, val) & val; +} + +#else + +__int64 interlocked_or_fetch_64(__int64 volatile* ptr, __int64 val) { + __int64 old, res; + do { + old = *ptr; + res = old | val; + } while(_InterlockedCompareExchange64(ptr, res, old) != old); + return res; +} + +__int64 interlocked_fetch_or_64(__int64 volatile* ptr, __int64 val) { + __int64 old; + do { + old = *ptr; + } while(_InterlockedCompareExchange64(ptr, old | val, old) != old); + return old; +} + +#endif + +// xor +char interlocked_xor_fetch_8(char volatile* ptr, char val) { + return _InterlockedXor8(ptr, val) ^ val; +} + +short interlocked_xor_fetch_16(short volatile* ptr, short val) { + return _InterlockedXor16(ptr, val) ^ val; +} + +long interlocked_xor_fetch_32(long volatile* ptr, long val) { + return _InterlockedXor(ptr, val) ^ val; +} + +#ifndef _M_IX86 + +__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val) { + return _InterlockedXor64(ptr, val) ^ val; +} + +#else + +__int64 interlocked_xor_fetch_64(__int64 volatile* ptr, __int64 val) { + __int64 old, res; + do { + old = *ptr; + res = old ^ val; + } while(_InterlockedCompareExchange64(ptr, res, old) != old); + return res; +} + +__int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val) { + __int64 old; + do { + old = *ptr; + } while(_InterlockedCompareExchange64(ptr, old ^ val, old) != old); + return old; +} + +#endif + void tsPrintOsInfo() {} char *taosCharsetReplace(char *charsetstr) { diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index 6f92154e9e..41c628e169 100644 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); + pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; @@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) { pHeader->spi = 0; pHeader->tcp = 0; pHeader->encrypt = 0; - pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); - if (pHeader->tranId == 0) pHeader->tranId = __sync_add_and_fetch_32(&pConn->tranId, 1); + pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); + if (pHeader->tranId == 0) pHeader->tranId = atomic_add_fetch_32(&pConn->tranId, 1); pHeader->sourceId = pConn->ownId; pHeader->destId = pConn->peerId; diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 4d2ebdfa35..373d9e713b 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -218,15 +218,15 @@ typedef struct { * Only the QInfo.signature == QInfo, this structure can be released safely. */ #define TSDB_QINFO_QUERY_FLAG 0x1 -#define TSDB_QINFO_RESET_SIG(x) ((x)->signature = (uint64_t)(x)) +#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) #define TSDB_QINFO_SET_QUERY_FLAG(x) \ - __sync_val_compare_and_swap(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); + atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); // live lock: wait for query reaching a safe-point, release all resources // belongs to this query #define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ { \ - while (__sync_val_compare_and_swap(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ + while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ taosMsleep(1); \ } \ } diff --git a/src/system/detail/src/dnodeSystem.c b/src/system/detail/src/dnodeSystem.c index a0fdf95fc8..1b5d9418b4 100644 --- a/src/system/detail/src/dnodeSystem.c +++ b/src/system/detail/src/dnodeSystem.c @@ -33,8 +33,8 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Woverflow" -SModule tsModule[TSDB_MOD_MAX]; -uint32_t tsModuleStatus; +SModule tsModule[TSDB_MOD_MAX] = {0}; +uint32_t tsModuleStatus = 0; pthread_mutex_t dmutex; extern int vnodeSelectReqNum; extern int vnodeInsertReqNum; @@ -216,8 +216,8 @@ void dnodeResetSystem() { void dnodeCountRequest(SCountInfo *info) { httpGetReqCount(&info->httpReqNum); - info->selectReqNum = __sync_fetch_and_and(&vnodeSelectReqNum, 0); - info->insertReqNum = __sync_fetch_and_and(&vnodeInsertReqNum, 0); + info->selectReqNum = atomic_exchange_32(&vnodeSelectReqNum, 0); + info->insertReqNum = atomic_exchange_32(&vnodeInsertReqNum, 0); } #pragma GCC diagnostic pop \ No newline at end of file diff --git a/src/system/detail/src/mgmtShell.c b/src/system/detail/src/mgmtShell.c index 2661ec0d37..f74ee9f51f 100644 --- a/src/system/detail/src/mgmtShell.c +++ b/src/system/detail/src/mgmtShell.c @@ -922,8 +922,8 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { taosSendMsgToPeer(pConn->thandle, pStart, msgLen); if (rowsToRead == 0) { - int64_t oldSign = __sync_val_compare_and_swap(&pShow->signature, (uint64_t)pShow, 0); - if (oldSign != (uint64_t)pShow) { + uintptr_t oldSign = atomic_val_compare_exchange_ptr(&pShow->signature, pShow, 0); + if (oldSign != (uintptr_t)pShow) { return msgLen; } // pShow->signature = 0; @@ -1093,8 +1093,8 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { } void mgmtEstablishConn(SConnObj *pConn) { - __sync_fetch_and_add(&mgmtShellConns, 1); - __sync_fetch_and_add(&sdbExtConns, 1); + atomic_fetch_add_32(&mgmtShellConns, 1); + atomic_fetch_add_32(&sdbExtConns, 1); pConn->stime = taosGetTimestampMs(); if (strcmp(pConn->pUser->user, "root") == 0 || strcmp(pConn->pUser->user, pConn->pAcct->user) == 0) { @@ -1168,8 +1168,8 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (pConn->pAcct) { mgmtRemoveConnFromAcct(pConn); - __sync_fetch_and_sub(&mgmtShellConns, 1); - __sync_fetch_and_sub(&sdbExtConns, 1); + atomic_fetch_sub_32(&mgmtShellConns, 1); + atomic_fetch_sub_32(&sdbExtConns, 1); } code = 0; @@ -1227,8 +1227,8 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { if (msg == NULL) { if (pConn) { mgmtRemoveConnFromAcct(pConn); - __sync_fetch_and_sub(&mgmtShellConns, 1); - __sync_fetch_and_sub(&sdbExtConns, 1); + atomic_fetch_sub_32(&mgmtShellConns, 1); + atomic_fetch_sub_32(&sdbExtConns, 1); mTrace("connection from %s is closed", pConn->pUser->user); memset(pConn, 0, sizeof(SConnObj)); } diff --git a/src/system/detail/src/vnodeCache.c b/src/system/detail/src/vnodeCache.c index 8b51bc4609..f4bea682e9 100644 --- a/src/system/detail/src/vnodeCache.c +++ b/src/system/detail/src/vnodeCache.c @@ -256,7 +256,7 @@ void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count) { tslot = (tslot + 1) % pInfo->maxBlocks; } - __sync_fetch_and_add(&pObj->freePoints, pObj->pointsPerBlock * slots); + atomic_fetch_add_32(&pObj->freePoints, pObj->pointsPerBlock * slots); pInfo->commitSlot = slot; pInfo->commitPoint = pos; pObj->commitCount = count; @@ -505,7 +505,7 @@ int vnodeInsertPointToCache(SMeterObj *pObj, char *pData) { pData += pObj->schema[col].bytes; } - __sync_fetch_and_sub(&pObj->freePoints, 1); + atomic_fetch_sub_32(&pObj->freePoints, 1); pCacheBlock->numOfPoints++; pPool->count++; @@ -1114,7 +1114,7 @@ int vnodeSyncRestoreCache(int vnode, int fd) { for (int col = 0; col < pObj->numOfColumns; ++col) if (taosReadMsg(fd, pBlock->offset[col], pObj->schema[col].bytes * points) <= 0) return -1; - __sync_fetch_and_sub(&pObj->freePoints, points); + atomic_fetch_sub_32(&pObj->freePoints, points); blocksReceived++; pointsReceived += points; pObj->lastKey = *((TSKEY *)(pBlock->offset[0] + pObj->schema[0].bytes * (points - 1))); diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index df94c883ac..fdafc01352 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -410,7 +410,7 @@ void vnodeRemoveFile(int vnode, int fileId) { int fd = open(headName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (fd > 0) { vnodeGetHeadFileHeaderInfo(fd, &headInfo); - __sync_fetch_and_add(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage); + atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage); close(fd); } diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index f50b6f4946..ed9c319216 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -497,7 +497,7 @@ int vnodeImportToFile(SImportInfo *pImport) { pInfo->commitPoint = 0; pCacheBlock->numOfPoints = points; if (slot == pInfo->currentSlot) { - __sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint); + atomic_fetch_add_32(&pObj->freePoints, pInfo->commitPoint); } } else { // if last block is full and committed @@ -625,7 +625,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { } code = 0; - __sync_fetch_and_sub(&pObj->freePoints, rows); + atomic_fetch_sub_32(&pObj->freePoints, rows); dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); _exit: diff --git a/src/system/detail/src/vnodeMeter.c b/src/system/detail/src/vnodeMeter.c index a595e8f689..c001daf7da 100644 --- a/src/system/detail/src/vnodeMeter.c +++ b/src/system/detail/src/vnodeMeter.c @@ -643,8 +643,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pData += pObj->bytesPerPoint; points++; } - __sync_fetch_and_add(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1)); - __sync_fetch_and_add(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint); + atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1)); + atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint); pthread_mutex_lock(&(pVnode->vmutex)); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 654f9537e4..9967094a26 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -3890,14 +3890,14 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; if (pSupporter == NULL || pSupporter->numOfMeters == 1) { - __sync_fetch_and_sub(&pQInfo->pObj->numOfQueries, 1); + atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); } else { int32_t num = 0; for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { SMeterObj *pMeter = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[i]->sid); - __sync_fetch_and_sub(&(pMeter->numOfQueries), 1); + atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid, diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index bbe2f60274..1b4b43f5a2 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -365,7 +365,7 @@ _query_over: vnodeFreeColumnInfo(&pQueryMsg->colList[i]); } - __sync_fetch_and_add(&vnodeSelectReqNum, 1); + atomic_fetch_add_32(&vnodeSelectReqNum, 1); return ret; } @@ -599,6 +599,6 @@ _submit_over: // for import, send the submit response only when return code is not zero if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints); - __sync_fetch_and_add(&vnodeInsertReqNum, 1); + atomic_fetch_add_32(&vnodeInsertReqNum, 1); return ret; } diff --git a/src/system/detail/src/vnodeStore.c b/src/system/detail/src/vnodeStore.c index e00b4de7bc..e4294d72c6 100644 --- a/src/system/detail/src/vnodeStore.c +++ b/src/system/detail/src/vnodeStore.c @@ -351,7 +351,7 @@ void vnodeCalcOpenVnodes() { openVnodes++; } - __sync_val_compare_and_swap(&tsOpenVnodes, tsOpenVnodes, openVnodes); + atomic_store_32(&tsOpenVnodes, openVnodes); } void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables) { diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index 7a2fb5530c..b097b4706c 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -567,7 +567,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid * check if the numOfQueries is 0 or not. */ pMeterObjList[(*numOfInc)++] = pMeter; - __sync_fetch_and_add(&pMeter->numOfQueries, 1); + atomic_fetch_add_32(&pMeter->numOfQueries, 1); // output for meter more than one query executed if (pMeter->numOfQueries > 1) { @@ -591,7 +591,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, SMeterObj* pMeter = pMeterObjList[i]; if (pMeter != NULL) { // here, do not need to lock to perform operations - __sync_fetch_and_sub(&pMeter->numOfQueries, 1); + atomic_fetch_sub_32(&pMeter->numOfQueries, 1); if (pMeter->numOfQueries > 0) { dTrace("qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid, @@ -646,7 +646,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { } int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) { - return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state); + return atomic_val_compare_exchange_32(&pMeterObj->state, TSDB_METER_STATE_READY, state); } void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) { diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 7804cb0d0e..e92ac5d46d 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -516,7 +516,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k pNewNode->addTime = taosGetTimestampMs(); pNewNode->time = pNewNode->addTime + keepTime; - __sync_add_and_fetch_32(&pNewNode->refCount, 1); + atomic_add_fetch_32(&pNewNode->refCount, 1); // the address of this node may be changed, so the prev and next element should update the corresponding pointer taosUpdateInHashTable(pObj, pNewNode); @@ -529,7 +529,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k return NULL; } - __sync_add_and_fetch_32(&pNewNode->refCount, 1); + atomic_add_fetch_32(&pNewNode->refCount, 1); assert(hashVal == (*pObj->hashFp)(key, keyLen - 1)); pNewNode->hashVal = hashVal; @@ -558,7 +558,7 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui return NULL; } - __sync_add_and_fetch_32(&pNode->refCount, 1); + atomic_add_fetch_32(&pNode->refCount, 1); pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1); taosAddNodeToHashTable(pObj, pNode); @@ -616,7 +616,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) { } if (pNode->refCount > 0) { - __sync_sub_and_fetch_32(&pNode->refCount, 1); + atomic_sub_fetch_32(&pNode->refCount, 1); pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount); } else { /* @@ -676,20 +676,20 @@ void *taosGetDataFromCache(void *handle, char *key) { SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen); if (ptNode != NULL) { - __sync_add_and_fetch_32(&ptNode->refCount, 1); + atomic_add_fetch_32(&ptNode->refCount, 1); } __cache_unlock(pObj); if (ptNode != NULL) { - __sync_add_and_fetch_32(&pObj->statistics.hitCount, 1); + atomic_add_fetch_32(&pObj->statistics.hitCount, 1); pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount); } else { - __sync_add_and_fetch_32(&pObj->statistics.missCount, 1); + atomic_add_fetch_32(&pObj->statistics.missCount, 1); pTrace("key:%s not in cache,retrieved failed", key); } - __sync_add_and_fetch_32(&pObj->statistics.totalAccess, 1); + atomic_add_fetch_32(&pObj->statistics.totalAccess, 1); return (ptNode != NULL) ? ptNode->data : NULL; } diff --git a/src/util/src/textbuffer.c b/src/util/src/textbuffer.c index c6b56919e9..ebb6b9e0b8 100644 --- a/src/util/src/textbuffer.c +++ b/src/util/src/textbuffer.c @@ -60,7 +60,7 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { strcat(tmpPath, fileNamePrefix); strcat(tmpPath, "-%u-%u"); - snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), __sync_add_and_fetch_32(&tmpFileSerialNum, 1)); + snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1)); } /* diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 586f329001..bac89db7e2 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -364,7 +364,7 @@ void tsReadLogOption(char *option, char *value) { } } -SGlobalConfig *tsGetConfigOption(char *option) { +SGlobalConfig *tsGetConfigOption(const char *option) { tsInitGlobalConfig(); for (int i = 0; i < tsGlobalConfigNum; ++i) { SGlobalConfig *cfg = tsGlobalConfig + i; @@ -374,7 +374,7 @@ SGlobalConfig *tsGetConfigOption(char *option) { return NULL; } -void tsReadConfigOption(char *option, char *value) { +void tsReadConfigOption(const char *option, char *value) { for (int i = 0; i < tsGlobalConfigNum; ++i) { SGlobalConfig *cfg = tsGlobalConfig + i; if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_CONFIG)) continue; @@ -423,9 +423,7 @@ void tsInitConfigOption(SGlobalConfig *cfg, char *name, void *ptr, int8_t valTyp cfg->cfgStatus = TSDB_CFG_CSTATUS_NONE; } -void tsInitGlobalConfig() { - if (tsGlobalConfig != NULL) return; - +static void doInitGlobalConfig() { tsGlobalConfig = (SGlobalConfig *) malloc(sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM); memset(tsGlobalConfig, 0, sizeof(SGlobalConfig) * TSDB_CFG_MAX_NUM); @@ -783,6 +781,11 @@ void tsInitGlobalConfig() { tsGlobalConfigNum = (int)(cfg - tsGlobalConfig); } +static pthread_once_t initGlobalConfig = PTHREAD_ONCE_INIT; +void tsInitGlobalConfig() { + pthread_once(&initGlobalConfig, doInitGlobalConfig); +} + void tsReadGlobalLogConfig() { tsInitGlobalConfig(); diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 02e30be33a..b88cb7dc17 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -381,7 +381,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) } if (taosLogMaxLines > 0) { - __sync_add_and_fetch_32(&taosLogLines, 1); + atomic_add_fetch_32(&taosLogLines, 1); if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile(); } @@ -458,7 +458,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f taosPushLogBuffer(logHandle, buffer, len); if (taosLogMaxLines > 0) { - __sync_add_and_fetch_32(&taosLogLines, 1); + atomic_add_fetch_32(&taosLogLines, 1); if ((taosLogLines > taosLogMaxLines) && (openInProgress == 0)) taosOpenNewLogFile(); } diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 97e2189153..cf09aaee5e 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -105,15 +105,15 @@ static timer_map_t timerMap; static uintptr_t getNextTimerId() { uintptr_t id; do { - id = __sync_add_and_fetch_ptr(&nextTimerId, 1); + id = atomic_add_fetch_ptr(&nextTimerId, 1); } while (id == 0); return id; } -static void timerAddRef(tmr_obj_t* timer) { __sync_add_and_fetch_8(&timer->refCount, 1); } +static void timerAddRef(tmr_obj_t* timer) { atomic_add_fetch_8(&timer->refCount, 1); } static void timerDecRef(tmr_obj_t* timer) { - if (__sync_sub_and_fetch_8(&timer->refCount, 1) == 0) { + if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) { free(timer); } } @@ -121,7 +121,7 @@ static void timerDecRef(tmr_obj_t* timer) { static void lockTimerList(timer_list_t* list) { int64_t tid = taosGetPthreadId(); int i = 0; - while (__sync_val_compare_and_swap_64(&(list->lockedBy), 0, tid) != 0) { + while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) { if (++i % 1000 == 0) { sched_yield(); } @@ -130,7 +130,7 @@ static void lockTimerList(timer_list_t* list) { static void unlockTimerList(timer_list_t* list) { int64_t tid = taosGetPthreadId(); - if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) { + if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) { assert(false); tmrError("%d trying to unlock a timer list not locked by current thread.", tid); } @@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) { static void processExpiredTimer(void* handle, void* arg) { tmr_obj_t* timer = (tmr_obj_t*)handle; timer->executedBy = taosGetPthreadId(); - uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); + uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); if (state == TIMER_STATE_WAITING) { const char* fmt = "%s timer[id=%lld, fp=%p, param=%p] execution start."; tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); @@ -431,7 +431,7 @@ bool taosTmrStop(tmr_h timerId) { return false; } - uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); + uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); doStopTimer(timer, state); timerDecRef(timer); @@ -456,7 +456,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, if (timer == NULL) { tmrTrace("%s timer[id=%lld] does not exist", ctrl->label, id); } else { - uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); + uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); if (!doStopTimer(timer, state)) { timerDecRef(timer); timer = NULL; diff --git a/tests/examples/lua/build.sh b/tests/examples/lua/build.sh old mode 100644 new mode 100755 diff --git a/tests/examples/lua/lua_connector.c b/tests/examples/lua/lua_connector.c index f37657e822..f4065bb274 100644 --- a/tests/examples/lua/lua_connector.c +++ b/tests/examples/lua/lua_connector.c @@ -7,8 +7,15 @@ #include #include -static int l_connect(lua_State *L) -{ +struct cb_param{ + lua_State* state; + int callback; + void * stream; +}; + + + +static int l_connect(lua_State *L){ TAOS * taos; char *host = lua_tostring(L, 1); char *user = lua_tostring(L, 2); @@ -29,6 +36,7 @@ static int l_connect(lua_State *L) lua_pushstring(L, taos_errstr(taos)); lua_setfield(L, table_index, "error"); lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "conn"); }else{ printf("success to connect server\n"); lua_pushnumber(L, 0); @@ -49,7 +57,7 @@ static int l_query(lua_State *L){ lua_newtable(L); int table_index = lua_gettop(L); - printf("receive command:%s\r\n",s); + // printf("receive command:%s\r\n",s); if(taos_query(taos, s)!=0){ printf("failed, reason:%s\n", taos_errstr(taos)); lua_pushnumber(L, -1); @@ -78,8 +86,12 @@ static int l_query(lua_State *L){ TAOS_FIELD *fields = taos_fetch_fields(result); char temp[256]; + int affectRows = taos_affected_rows(taos); + // printf(" affect rows:%d\r\n", affectRows); lua_pushnumber(L, 0); lua_setfield(L, table_index, "code"); + lua_pushinteger(L, affectRows); + lua_setfield(L, table_index, "affected"); lua_newtable(L); while ((row = taos_fetch_row(result))) { @@ -95,7 +107,7 @@ static int l_query(lua_State *L){ } lua_pushstring(L,fields[i].name); - //printf("field name:%s,type:%d\n",fields[i].name,fields[i].type); + switch (fields[i].type) { case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); @@ -142,6 +154,115 @@ static int l_query(lua_State *L){ return 1; } +void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ + + struct cb_param* p = (struct cb_param*) param; + TAOS_FIELD *fields = taos_fetch_fields(result); + int numFields = taos_num_fields(result); + + printf("\n\r-----------------------------------------------------------------------------------\n"); + + // printf("r:%d, L:%d\n",p->callback, p->state); + + lua_State *L = p->state; + lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback); + + lua_newtable(L); + + for (int i = 0; i < numFields; ++i) { + if (row[i] == NULL) { + continue; + } + + lua_pushstring(L,fields[i].name); + + switch (fields[i].type) { + case TSDB_DATA_TYPE_TINYINT: + lua_pushinteger(L,*((char *)row[i])); + break; + case TSDB_DATA_TYPE_SMALLINT: + lua_pushinteger(L,*((short *)row[i])); + break; + case TSDB_DATA_TYPE_INT: + lua_pushinteger(L,*((int *)row[i])); + break; + case TSDB_DATA_TYPE_BIGINT: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_FLOAT: + lua_pushnumber(L,*((float *)row[i])); + break; + case TSDB_DATA_TYPE_DOUBLE: + lua_pushnumber(L,*((double *)row[i])); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + lua_pushstring(L,(char *)row[i]); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + lua_pushinteger(L,*((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_BOOL: + lua_pushinteger(L,*((char *)row[i])); + break; + default: + lua_pushnil(L); + break; + } + + lua_settable(L, -3); + } + + lua_call(L, 1, 0); + + printf("-----------------------------------------------------------------------------------\n\r"); +} + +static int l_open_stream(lua_State *L){ + int r = luaL_ref(L, LUA_REGISTRYINDEX); + TAOS * taos = lua_topointer(L,1); + char * sqlstr = lua_tostring(L,2); + int stime = luaL_checknumber(L,3); + + lua_newtable(L); + int table_index = lua_gettop(L); + + struct cb_param *p = malloc(sizeof(struct cb_param)); + p->state = L; + p->callback=r; + // printf("r:%d, L:%d\n",r,L); + void * s = taos_open_stream(taos,sqlstr,stream_cb,stime,p,NULL); + if (s == NULL) { + printf("failed to open stream, reason:%s\n", taos_errstr(taos)); + free(p); + lua_pushnumber(L, -1); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + lua_pushlightuserdata(L,NULL); + lua_setfield(L, table_index, "stream"); + }else{ + // printf("success to open stream\n"); + lua_pushnumber(L, 0); + lua_setfield(L, table_index, "code"); + lua_pushstring(L, taos_errstr(taos)); + lua_setfield(L, table_index, "error"); + p->stream = s; + lua_pushlightuserdata(L,p); + lua_setfield(L, table_index, "stream");//stream has different content in lua and c. + } + + return 1; +} + +static int l_close_stream(lua_State *L){ + //TODO:get stream and free cb_param + struct cb_param *p = lua_touserdata(L,1); + taos_close_stream(p->stream); + free(p); + return 0; +} + static int l_close(lua_State *L){ TAOS * taos= lua_topointer(L,1); lua_newtable(L); @@ -166,6 +287,8 @@ static const struct luaL_Reg lib[] = { {"connect", l_connect}, {"query", l_query}, {"close", l_close}, + {"open_stream", l_open_stream}, + {"close_stream", l_close_stream}, {NULL, NULL} }; diff --git a/tests/examples/lua/test.lua b/tests/examples/lua/test.lua index f644b82dd4..38ae1c82f2 100644 --- a/tests/examples/lua/test.lua +++ b/tests/examples/lua/test.lua @@ -35,10 +35,12 @@ if res.code ~=0 then return end -res = driver.query(conn,"insert into m1 values (1592222222222,0,'robotspace'), (1592222222223,1,'Hilink'),(1592222222224,2,'Harmony')") +res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')") if res.code ~=0 then print(res.error) return +else + print("insert successfully, affected:"..res.affected) end res = driver.query(conn,"select * from m1") @@ -55,4 +57,69 @@ else end end +res = driver.query(conn,"CREATE TABLE thermometer (ts timestamp, degree double) TAGS(location binary(20), type int)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"CREATE TABLE therm1 USING thermometer TAGS ('beijing', 1)") +if res.code ~=0 then + print(res.error) + return +end +res = driver.query(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.001', 20),('2019-09-01 00:00:00.002', 21)") + +if res.code ~=0 then + print(res.error) + return +else + print("insert successfully, affected:"..res.affected) +end + +res = driver.query(conn,"SELECT COUNT(*) count, AVG(degree) AS av, MAX(degree), MIN(degree) FROM thermometer WHERE location='beijing' or location='tianjin' GROUP BY location, type") +if res.code ~=0 then + print("select error:"..res.error) + return +else + print("in lua, result:") + for i = 1, #(res.item) do + print("res:"..res.item[i].count) + end +end + +function callback(t) + print("continuous query result:") + for key, value in pairs(t) do + print("key:"..key..", value:"..value) + end +end + +local stream +res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback) +if res.code ~=0 then + print("open stream error:"..res.error) + return +else + print("openstream ok") + stream = res.stream +end + +--From now on we begin continous query in an definite (infinite if you want) loop. +local loop_index = 0 +while loop_index < 20 do + local t = os.time()*1000 + local v = loop_index + res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) + + if res.code ~=0 then + print(res.error) + return + else + print("insert successfully, affected:"..res.affected) + end + os.execute("sleep " .. 1) + loop_index = loop_index + 1 +end + +driver.close_stream(stream) driver.close(conn)