Merge remote-tracking branch 'origin/3.0' into fix/tsim

This commit is contained in:
Shengliang Guan 2022-07-02 18:51:07 +08:00
commit a08945df13
48 changed files with 897 additions and 612 deletions

View File

@ -98,12 +98,12 @@ ELSE ()
ENDIF ()
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=0")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=0")
ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")

View File

@ -65,7 +65,7 @@ typedef struct SQueryExecRes {
} SQueryExecRes;
typedef struct SIndexMeta {
#ifdef WINDOWS
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors;
#endif

View File

@ -41,7 +41,6 @@ extern "C" {
#include <sys/types.h>
#include <termios.h>
#include <sys/statvfs.h>
#include <sys/prctl.h>
#include <sys/shm.h>
#include <sys/wait.h>

View File

@ -63,7 +63,7 @@ int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_add_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_add_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_add_fetch_64(int64_t volatile *ptr, int64_t val);
void *atomic_add_fetch_ptr(void *ptr, void *val);
void *atomic_add_fetch_ptr(void *ptr, int64_t val);
int8_t atomic_fetch_add_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_add_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_add_32(int32_t volatile *ptr, int32_t val);
@ -73,7 +73,7 @@ int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_sub_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_sub_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_sub_fetch_64(int64_t volatile *ptr, int64_t val);
void *atomic_sub_fetch_ptr(void *ptr, void *val);
void *atomic_sub_fetch_ptr(void *ptr, int64_t val);
int8_t atomic_fetch_sub_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_sub_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_sub_32(int32_t volatile *ptr, int32_t val);

View File

@ -24,7 +24,9 @@ extern "C" {
#if defined(_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
// typedef struct tsem_s *tsem_t;
typedef struct bosal_sem_t *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
@ -51,11 +53,11 @@ int tsem_timewait(tsem_t *sim, int64_t nanosecs);
// #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
#define TdThreadSpinlock TdThreadMutex
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
#define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
#define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
#define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
// #define TdThreadSpinlock TdThreadMutex
// #define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
// #define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
// #define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
// #define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
#endif
bool taosCheckPthreadValid(TdThread thread);

View File

@ -64,7 +64,6 @@
#include <osEok.h>
#else
#include <netinet/in.h>
#include <sys/epoll.h>
#endif
#endif
@ -77,15 +76,12 @@ typedef int socklen_t;
#define TAOS_EPOLL_WAIT_TIME 100
typedef SOCKET eventfd_t;
#define eventfd(a, b) -1
#define EpollClose(pollFd) epoll_close(pollFd)
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
#elif defined(_TD_DARWIN_64)
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#else
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
@ -122,14 +118,6 @@ typedef SOCKET EpollFd;
typedef int32_t SocketFd;
typedef SocketFd EpollFd;
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
#endif
int refId;
SocketFd fd;
} * TdSocketPtr, TdSocket;
typedef struct TdSocketServer *TdSocketServerPtr;
typedef struct TdSocket * TdSocketPtr;
typedef struct TdEpoll * TdEpollPtr;
@ -181,11 +169,6 @@ void taosSetMaskSIGPIPE();
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
TdEpollPtr taosCreateEpoll(int32_t size);
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event);
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout);
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll);
#ifdef __cplusplus
}
#endif

View File

@ -188,27 +188,27 @@ int32_t taosThreadJoin(TdThread thread, void **valuePtr);
int32_t taosThreadKeyCreate(TdThreadKey * key, void(*destructor)(void *));
int32_t taosThreadKeyDelete(TdThreadKey key);
int32_t taosThreadKill(TdThread thread, int32_t sig);
int32_t taosThreadMutexConsistent(TdThreadMutex* mutex);
// int32_t taosThreadMutexConsistent(TdThreadMutex* mutex);
int32_t taosThreadMutexDestroy(TdThreadMutex * mutex);
int32_t taosThreadMutexInit(TdThreadMutex * mutex, const TdThreadMutexAttr * attr);
int32_t taosThreadMutexLock(TdThreadMutex * mutex);
int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime);
// int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime);
int32_t taosThreadMutexTryLock(TdThreadMutex * mutex);
int32_t taosThreadMutexUnlock(TdThreadMutex * mutex);
int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr * attr);
int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr * attr, int32_t *pshared);
int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust);
// int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust);
int32_t taosThreadMutexAttrGetType(const TdThreadMutexAttr * attr, int32_t *kind);
int32_t taosThreadMutexAttrInit(TdThreadMutexAttr * attr);
int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr * attr, int32_t pshared);
int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust);
// int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust);
int32_t taosThreadMutexAttrSetType(TdThreadMutexAttr * attr, int32_t kind);
int32_t taosThreadOnce(TdThreadOnce * onceControl, void(*initRoutine)(void));
int32_t taosThreadRwlockDestroy(TdThreadRwlock * rwlock);
int32_t taosThreadRwlockInit(TdThreadRwlock * rwlock, const TdThreadRwlockAttr * attr);
int32_t taosThreadRwlockRdlock(TdThreadRwlock * rwlock);
int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
// int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
// int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
int32_t taosThreadRwlockTryRdlock(TdThreadRwlock * rwlock);
int32_t taosThreadRwlockTryWrlock(TdThreadRwlock * rwlock);
int32_t taosThreadRwlockUnlock(TdThreadRwlock * rwlock);

View File

@ -83,6 +83,7 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define NCHAR_WIDTH_TO_BYTES(n) ((n) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE)

View File

@ -589,7 +589,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
return TSDB_CODE_FAILED;
}
newColData[len] = 0;
int32_t ret = taosParseTime(newColData, timeVal, len + 1, (int32_t)timePrec, tsDaylight);
int32_t ret = taosParseTime(newColData, timeVal, len, (int32_t)timePrec, tsDaylight);
if (ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(newColData);
return ret;

View File

@ -845,11 +845,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t offset = getDataStartOffset();
int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
#if defined(_TD_DARWIN_64)
int64_t written = taosFSendFile(pDestBuf->pFile->fp, pSrcBuf->pFile->fp, &offset, size);
#else
int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
#endif
if (written == -1 || written != size) {
return -1;

View File

@ -87,7 +87,7 @@ typedef struct {
} STelemMgmt;
typedef struct {
sem_t syncSem;
tsem_t syncSem;
int64_t sync;
bool standby;
SReplica replica;

View File

@ -241,7 +241,7 @@ struct SVnode {
tsem_t canCommit;
int64_t sync;
int32_t syncCount;
sem_t syncSem;
tsem_t syncSem;
SQHandle* pQuery;
};

View File

@ -278,7 +278,7 @@ typedef struct SCtgAsyncFps {
typedef struct SCtgApiStat {
#ifdef WINDOWS
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors;
#endif

View File

@ -1,23 +1,25 @@
MESSAGE(STATUS "build catalog unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
catalogTest
PUBLIC os util common catalog transport gtest qcom taos_static
)
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
catalogTest
PUBLIC os util common catalog transport gtest qcom taos_static
)
TARGET_INCLUDE_DIRECTORIES(
catalogTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/catalog/inc"
)
TARGET_INCLUDE_DIRECTORIES(
catalogTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/catalog/inc"
)
# add_test(
# NAME catalogTest
# COMMAND catalogTest
# )
# add_test(
# NAME catalogTest
# COMMAND catalogTest
# )
ENDIF()

View File

@ -35,7 +35,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) {
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle, void* pParam) {
switch (nodeType(pDataSink)) {
switch ((int)nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:

View File

@ -4101,7 +4101,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
SValueNode* pValue = (SValueNode*)pNew;
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
isNull[index++] = 1;
continue;
} else {

View File

@ -1446,19 +1446,9 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// The number of parameters has been limited by the syntax definition
// uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
// The function return type has been set during syntax parsing
uint8_t para2Type = pFunc->node.resType.type;
// if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT &&
// para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR &&
// para2Type != TSDB_DATA_TYPE_TIMESTAMP) {
// return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
// }
// if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) ||
// (para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) {
// return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
// }
int32_t para2Bytes = pFunc->node.resType.bytes;
if (IS_VAR_DATA_TYPE(para2Type)) {
@ -1468,6 +1458,11 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"CAST function converted length should be in range [0, 1000]");
}
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
return TSDB_CODE_SUCCESS;
}

View File

@ -17,7 +17,7 @@
FstSparseSet *sparSetCreate(int32_t sz) {
FstSparseSet *ss = taosMemoryCalloc(1, sizeof(FstSparseSet));
if (ss = NULL) {
if (ss == NULL) {
return NULL;
}

View File

@ -75,18 +75,18 @@ void MonitorTest::GetSysInfo(SMonSysInfo *pInfo) {
pInfo->cpu_engine = 2.1;
pInfo->cpu_system = 2.1;
pInfo->cpu_cores = 2;
pInfo->mem_engine = 3.1;
pInfo->mem_system = 3.2;
pInfo->mem_total = 3.3;
pInfo->disk_engine = 4.1;
pInfo->disk_used = 4.2;
pInfo->disk_total = 4.3;
pInfo->net_in = 5.1;
pInfo->net_out = 5.2;
pInfo->io_read = 6.1;
pInfo->io_write = 6.2;
pInfo->io_read_disk = 7.1;
pInfo->io_write_disk = 7.2;
pInfo->mem_engine = 3;
pInfo->mem_system = 3;
pInfo->mem_total = 3;
pInfo->disk_engine = 4;
pInfo->disk_used = 4;
pInfo->disk_total = 4;
pInfo->net_in = 5;
pInfo->net_out = 5;
pInfo->io_read = 6;
pInfo->io_write = 6;
pInfo->io_read_disk = 7;
pInfo->io_write_disk = 7;
}
void MonitorTest::GetClusterInfo(SMonClusterInfo *pInfo) {

View File

@ -1,32 +1,34 @@
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
TARGET_INCLUDE_DIRECTORIES(
parserTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/parser/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/parser/inc"
)
TARGET_LINK_LIBRARIES(
parserTest
PUBLIC os util common nodes parser catalog transport gtest function planner qcom
)
if(${BUILD_WINGETOPT})
target_include_directories(
parserTest
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
TARGET_INCLUDE_DIRECTORIES(
parserTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/parser/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/parser/inc"
)
target_link_libraries(parserTest PUBLIC wingetopt)
endif()
add_test(
NAME parserTest
COMMAND parserTest
)
TARGET_LINK_LIBRARIES(
parserTest
PUBLIC os util common nodes parser catalog transport gtest function planner qcom
)
if(${BUILD_WINGETOPT})
target_include_directories(
parserTest
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
)
target_link_libraries(parserTest PUBLIC wingetopt)
endif()
add_test(
NAME parserTest
COMMAND parserTest
)
ENDIF()

View File

@ -1,18 +1,19 @@
MESSAGE(STATUS "build qworker unit test")
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
qworkerTest
PUBLIC os util common transport gtest qcom nodes planner qworker executor
)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
qworkerTest
PUBLIC os util common transport gtest qcom nodes planner qworker executor
)
TARGET_INCLUDE_DIRECTORIES(
qworkerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/qworker/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/qworker/inc"
)
TARGET_INCLUDE_DIRECTORIES(
qworkerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/qworker/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/qworker/inc"
)
ENDIF()

View File

@ -196,7 +196,7 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
return 0;
default:
assert(0);
return 0;
}
}
@ -222,7 +222,7 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
terrno = TSDB_CODE_QRY_JSON_IN_ERROR;
return 0;
default:
assert(0);
return 0;
}
}

View File

@ -109,9 +109,8 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
}
if (IS_VAR_DATA_TYPE(type)) {
char* data = colDataGetVarData(out.columnData, 0);
len = varDataLen(data);
buf = varDataVal(data);
buf = colDataGetVarData(out.columnData, 0);
len = varDataTLen(data);
} else {
len = tDataTypes[type].bytes;
buf = out.columnData->pData;
@ -119,8 +118,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
} else {
buf = nodesGetValueFromNode(valueNode);
if (IS_VAR_DATA_TYPE(type)) {
len = varDataLen(buf);
buf = varDataVal(buf);
len = varDataTLen(buf);
} else {
len = valueNode->node.resType.bytes;
}
@ -194,7 +192,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
param->numOfRows = 1;
param->columnData = sclCreateColumnInfoData(&valueNode->node.resType, 1);
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
colDataAppendNULL(param->columnData, 0);
} else {
colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
@ -345,7 +343,7 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
return -1;
}
switch (nodeType(pNode)) {
switch ((int)nodeType(pNode)) {
case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)pNode;
return valueNode->node.resType.type;
@ -538,6 +536,14 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
int32_t rowNum = 0;
int32_t code = 0;
// json not support in in operator
if(nodeType(node->pLeft) == QUERY_NODE_VALUE){
SValueNode *valueNode = (SValueNode *)node->pLeft;
if(valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)){
SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
}
}
SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
output->columnData = sclCreateColumnInfoData(&node->node.resType, rowNum);
if (output->columnData == NULL) {
@ -777,7 +783,12 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
res->translate = true;
if (colDataIsNull_s(output.columnData, 0)) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
if(node->node.resType.type != TSDB_DATA_TYPE_JSON){
res->node.resType.type = TSDB_DATA_TYPE_NULL;
}else{
res->node.resType = node->node.resType;
res->isNull = true;
}
} else {
res->node.resType = node->node.resType;
int32_t type = output.columnData->info.type;

View File

@ -934,9 +934,16 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t timeVal;
if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) {
//convert to 0
*(int64_t *)output = 0;
int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
int32_t ret = convertStringToTimestamp(inputType, input, timePrec, &timeVal);
if (ret != TSDB_CODE_SUCCESS) {
*(int64_t *)output = 0;
} else {
*(int64_t *)output = timeVal;
}
} else {
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
}

View File

@ -1,18 +1,20 @@
MESSAGE(STATUS "build filter unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(filterTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
filterTest
PUBLIC os util common gtest qcom function nodes scalar
)
ADD_EXECUTABLE(filterTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
filterTest
PUBLIC os util common gtest qcom function nodes scalar
)
TARGET_INCLUDE_DIRECTORIES(
filterTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
)
TARGET_INCLUDE_DIRECTORIES(
filterTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
)
ENDIF()

View File

@ -1,23 +1,25 @@
MESSAGE(STATUS "build scalar unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(scalarTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
scalarTest
PUBLIC os util common gtest qcom function nodes scalar parser catalog transport
)
ADD_EXECUTABLE(scalarTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
scalarTest
PUBLIC os util common gtest qcom function nodes scalar parser catalog transport
)
TARGET_INCLUDE_DIRECTORIES(
scalarTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar/"
PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
)
add_test(
NAME scalarTest
COMMAND scalarTest
)
TARGET_INCLUDE_DIRECTORIES(
scalarTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar/"
PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
)
add_test(
NAME scalarTest
COMMAND scalarTest
)
ENDIF()

View File

@ -68,7 +68,7 @@ typedef struct SSchHbTrans {
typedef struct SSchApiStat {
#ifdef WINDOWS
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors;
#endif
@ -76,7 +76,7 @@ typedef struct SSchApiStat {
typedef struct SSchRuntimeStat {
#ifdef WINDOWS
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors;
#endif
@ -84,7 +84,7 @@ typedef struct SSchRuntimeStat {
typedef struct SSchJobStat {
#ifdef WINDOWS
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors;
#endif

View File

@ -1,18 +1,20 @@
MESSAGE(STATUS "build scheduler unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
schedulerTest
PUBLIC os util common catalog transport gtest qcom taos_static planner scheduler
)
ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
schedulerTest
PUBLIC os util common catalog transport gtest qcom taos_static planner scheduler
)
TARGET_INCLUDE_DIRECTORIES(
schedulerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scheduler/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scheduler/inc"
)
TARGET_INCLUDE_DIRECTORIES(
schedulerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/scheduler/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scheduler/inc"
)
ENDIF()

View File

@ -18,7 +18,7 @@
int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags) {
// not support read-committed version at the moment
ASSERT(flags == 0 || flags == TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
ASSERT(flags == 0 || flags == (TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
pTxn->flags = flags;
pTxn->txnId = txnid;

View File

@ -39,12 +39,16 @@ endif()
target_link_libraries(
os PUBLIC pthread
)
if(NOT TD_WINDOWS)
target_link_libraries(
os PUBLIC dl m rt
)
else()
if(TD_WINDOWS)
target_link_libraries(
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm
)
endif(NOT TD_WINDOWS)
elseif(TD_DARWIN_64)
target_link_libraries(
os PUBLIC dl m iconv
)
else()
target_link_libraries(
os PUBLIC dl m rt
)
endif()

View File

@ -518,7 +518,7 @@ int64_t atomic_add_fetch_64(int64_t volatile *ptr, int64_t val) {
#endif
}
void* atomic_add_fetch_ptr(void *ptr, void *val) {
void* atomic_add_fetch_ptr(void *ptr, int64_t val) {
#ifdef WINDOWS
return interlocked_add_fetch_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
@ -618,11 +618,13 @@ int64_t atomic_sub_fetch_64(int64_t volatile *ptr, int64_t val) {
#endif
}
void* atomic_sub_fetch_ptr(void *ptr, void* val) {
void* atomic_sub_fetch_ptr(void *ptr, int64_t val) {
#ifdef WINDOWS
return interlocked_sub_fetch_ptr(ptr, val);
#elif defined(_TD_NINGSI_60)
return __sync_sub_and_fetch((ptr), (val));
#elif defined(_TD_DARWIN_64)
return __atomic_sub_fetch((void **)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_sub_fetch((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -673,6 +675,8 @@ void* atomic_fetch_sub_ptr(void *ptr, void* val) {
return interlocked_fetch_sub_ptr(ptr, val);
#elif defined(_TD_NINGSI_60)
return __sync_fetch_and_sub((ptr), (val));
#elif defined(_TD_DARWIN_64)
return __atomic_fetch_sub((void **)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_fetch_sub((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -723,6 +727,8 @@ void* atomic_and_fetch_ptr(void *ptr, void *val) {
return interlocked_and_fetch_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_and_and_fetch((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_and_fetch((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_and_fetch((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -773,6 +779,8 @@ void* atomic_fetch_and_ptr(void *ptr, void *val) {
return interlocked_fetch_and_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_fetch_and_and((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_fetch_and((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_fetch_and((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -823,6 +831,8 @@ void* atomic_or_fetch_ptr(void *ptr, void *val) {
return interlocked_or_fetch_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_or_and_fetch((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_or_fetch((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_or_fetch((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -873,6 +883,8 @@ void* atomic_fetch_or_ptr(void *ptr, void *val) {
return interlocked_fetch_or_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_fetch_and_or((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_fetch_or((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_fetch_or((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -923,6 +935,8 @@ void* atomic_xor_fetch_ptr(void *ptr, void *val) {
return interlocked_xor_fetch_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_xor_and_fetch((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_xor_fetch((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_xor_fetch((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif
@ -973,6 +987,8 @@ void* atomic_fetch_xor_ptr(void *ptr, void *val) {
return interlocked_fetch_xor_ptr((void* volatile*)(ptr), (void*)(val));
#elif defined(_TD_NINGSI_60)
return __sync_fetch_and_xor((ptr), (val));
#elif defined(_TD_DARWIN_64)
return (void*)__atomic_fetch_xor((size_t *)(ptr), (size_t)(val), __ATOMIC_SEQ_CST);
#else
return __atomic_fetch_xor((void **)(ptr), (val), __ATOMIC_SEQ_CST);
#endif

View File

@ -35,6 +35,8 @@
#include <unistd.h>
#define LINUX_FILE_NO_TEXT_OPTION 0
#define O_TEXT LINUX_FILE_NO_TEXT_OPTION
#define _SEND_FILE_STEP_ 1000
#endif
#if defined(WINDOWS)
@ -612,28 +614,34 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
#elif defined(_TD_DARWIN_64)
int r = 0;
if (offset) {
r = fseek(in_file, *offset, SEEK_SET);
if (r == -1) return -1;
}
off_t len = size;
while (len > 0) {
char buf[1024 * 16];
off_t n = sizeof(buf);
if (len < n) n = len;
size_t m = fread(buf, 1, n, in_file);
if (m < n) {
int e = ferror(in_file);
if (e) return -1;
lseek(pFileIn->fd, (int32_t)(*offset), 0);
int64_t writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = {0};
for (int64_t len = 0; len < (size - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = read(pFileIn->fd, (void *)buffer, _SEND_FILE_STEP_);
if (rlen <= 0) {
return writeLen;
} else if (rlen < _SEND_FILE_STEP_) {
write(pFileOut->fd, (void *)buffer, (uint32_t)rlen);
return (int64_t)(writeLen + rlen);
} else {
write(pFileOut->fd, (void *)buffer, _SEND_FILE_STEP_);
writeLen += _SEND_FILE_STEP_;
}
if (m == 0) break;
if (m != fwrite(buf, 1, m, out_file)) {
return -1;
}
len -= m;
}
return size - len;
int64_t remain = size - writeLen;
if (remain > 0) {
size_t rlen = read(pFileIn->fd, (void *)buffer, (size_t)remain);
if (rlen <= 0) {
return writeLen;
} else {
write(pFileOut->fd, (void *)buffer, (uint32_t)remain);
writeLen += remain;
}
}
return writeLen;
#else

View File

@ -14,7 +14,11 @@
*/
#define ALLOW_FORBID_FUNC
#ifdef _TD_DARWIN_64
#include <malloc/malloc.h>
#else
#include <malloc.h>
#endif
#include "os.h"
#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
@ -323,6 +327,8 @@ int32_t taosMemorySize(void *ptr) {
#else
#ifdef WINDOWS
return _msize(ptr);
#elif defined(_TD_DARWIN_64)
return malloc_size(ptr);
#else
return malloc_usable_size(ptr);
#endif

View File

@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#include "pthread.h"
#ifdef WINDOWS
@ -111,289 +113,501 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX
#define SEM_USE_SEM
// #define SEM_USE_SEM
#ifdef SEM_USE_SEM
#include <mach/mach_error.h>
#include <mach/mach_init.h>
#include <mach/semaphore.h>
#include <mach/task.h>
// #ifdef SEM_USE_SEM
// #include <mach/mach_error.h>
// #include <mach/mach_init.h>
// #include <mach/semaphore.h>
// #include <mach/task.h>
static TdThread sem_thread;
static TdThreadOnce sem_once;
static task_t sem_port;
static volatile int sem_inited = 0;
static semaphore_t sem_exit;
// static TdThread sem_thread;
// static TdThreadOnce sem_once;
// static task_t sem_port;
// static volatile int sem_inited = 0;
// static semaphore_t sem_exit;
static void *sem_thread_routine(void *arg) {
(void)arg;
setThreadName("sem_thrd");
// static void *sem_thread_routine(void *arg) {
// (void)arg;
// setThreadName("sem_thrd");
sem_port = mach_task_self();
kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
sem_inited = -1;
return NULL;
}
sem_inited = 1;
semaphore_wait(sem_exit);
return NULL;
}
// sem_port = mach_task_self();
// kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
// if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// sem_inited = -1;
// return NULL;
// }
// sem_inited = 1;
// semaphore_wait(sem_exit);
// return NULL;
// }
static void once_init(void) {
int r = 0;
r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL);
if (r) {
fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
return;
}
while (sem_inited == 0) {
;
}
}
#endif
// static void once_init(void) {
// int r = 0;
// r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL);
// if (r) {
// fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// return;
// }
// while (sem_inited == 0) {
// ;
// }
// }
// #endif
struct tsem_s {
#ifdef SEM_USE_PTHREAD
TdThreadMutex lock;
TdThreadCond cond;
volatile int64_t val;
#elif defined(SEM_USE_POSIX)
size_t id;
sem_t *sem;
#elif defined(SEM_USE_SEM)
semaphore_t sem;
#else // SEM_USE_PTHREAD
dispatch_semaphore_t sem;
#endif // SEM_USE_PTHREAD
// struct tsem_s {
// #ifdef SEM_USE_PTHREAD
// TdThreadMutex lock;
// TdThreadCond cond;
// volatile int64_t val;
// #elif defined(SEM_USE_POSIX)
// size_t id;
// sem_t *sem;
// #elif defined(SEM_USE_SEM)
// semaphore_t sem;
// #else // SEM_USE_PTHREAD
// dispatch_semaphore_t sem;
// #endif // SEM_USE_PTHREAD
volatile unsigned int valid : 1;
};
// volatile unsigned int valid : 1;
// };
int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if (*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
if (!p) {
fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
// int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
// if (!p) {
// fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
#ifdef SEM_USE_PTHREAD
int r = taosThreadMutexInit(&p->lock, NULL);
do {
if (r) break;
r = taosThreadCondInit(&p->cond, NULL);
if (r) {
taosThreadMutexDestroy(&p->lock);
break;
// #ifdef SEM_USE_PTHREAD
// int r = taosThreadMutexInit(&p->lock, NULL);
// do {
// if (r) break;
// r = taosThreadCondInit(&p->cond, NULL);
// if (r) {
// taosThreadMutexDestroy(&p->lock);
// break;
// }
// p->val = value;
// } while (0);
// if (r) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #elif defined(SEM_USE_POSIX)
// static size_t tick = 0;
// do {
// size_t id = atomic_add_fetch_64(&tick, 1);
// if (id == SEM_VALUE_MAX) {
// atomic_store_64(&tick, 0);
// id = 0;
// }
// char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t%ld", id);
// p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value);
// p->id = id;
// if (p->sem != SEM_FAILED) break;
// int e = errno;
// if (e == EEXIST) continue;
// if (e == EINTR) continue;
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// e, strerror(e));
// abort();
// } while (p->sem == SEM_FAILED);
// #elif defined(SEM_USE_SEM)
// taosThreadOnce(&sem_once, once_init);
// if (sem_inited != 1) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, sem);
// errno = ENOMEM;
// return -1;
// }
// kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
// if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// // we fail-fast here, because we have less-doc about semaphore_create for the moment
// abort();
// }
// #else // SEM_USE_PTHREAD
// p->sem = dispatch_semaphore_create(value);
// if (p->sem == NULL) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #endif // SEM_USE_PTHREAD
// p->valid = 1;
// *sem = p;
// return 0;
// }
// int tsem_wait(tsem_t *sem) {
// if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->val -= 1;
// if (p->val < 0) {
// if (taosThreadCondWait(&p->cond, &p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// return 0;
// #elif defined(SEM_USE_POSIX)
// return sem_wait(p->sem);
// #elif defined(SEM_USE_SEM)
// return semaphore_wait(p->sem);
// #else // SEM_USE_PTHREAD
// return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
// #endif // SEM_USE_PTHREAD
// }
// int tsem_post(tsem_t *sem) {
// if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->val += 1;
// if (p->val <= 0) {
// if (taosThreadCondSignal(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// return 0;
// #elif defined(SEM_USE_POSIX)
// return sem_post(p->sem);
// #elif defined(SEM_USE_SEM)
// return semaphore_signal(p->sem);
// #else // SEM_USE_PTHREAD
// return dispatch_semaphore_signal(p->sem);
// #endif // SEM_USE_PTHREAD
// }
// int tsem_destroy(tsem_t *sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (!*sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// // abort();
// return 0;
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// // sem); abort();
// return 0;
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->valid = 0;
// if (taosThreadCondDestroy(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// if (taosThreadMutexDestroy(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// #elif defined(SEM_USE_POSIX)
// char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t%ld", p->id);
// int r = sem_unlink(name);
// if (r) {
// int e = errno;
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// e, strerror(e));
// abort();
// }
// #elif defined(SEM_USE_SEM)
// semaphore_destroy(sem_port, p->sem);
// #else // SEM_USE_PTHREAD
// #endif // SEM_USE_PTHREAD
// p->valid = 0;
// taosMemoryFree(p);
// *sem = NULL;
// return 0;
// }
typedef struct
{
pthread_mutex_t count_lock;
pthread_cond_t count_bump;
unsigned int count;
}bosal_sem_t;
int tsem_init(tsem_t *psem, int flags, unsigned int count)
{
bosal_sem_t *pnewsem;
int result;
pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t));
if (! pnewsem)
{
return -1;
}
p->val = value;
} while (0);
if (r) {
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
#elif defined(SEM_USE_POSIX)
static size_t tick = 0;
do {
size_t id = atomic_add_fetch_64(&tick, 1);
if (id == SEM_VALUE_MAX) {
atomic_store_64(&tick, 0);
id = 0;
result = pthread_mutex_init(&pnewsem->count_lock, NULL);
if (result)
{
free(pnewsem);
return result;
}
char name[NAME_MAX - 4];
snprintf(name, sizeof(name), "/t%ld", id);
p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value);
p->id = id;
if (p->sem != SEM_FAILED) break;
int e = errno;
if (e == EEXIST) continue;
if (e == EINTR) continue;
fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
e, strerror(e));
abort();
} while (p->sem == SEM_FAILED);
#elif defined(SEM_USE_SEM)
taosThreadOnce(&sem_once, once_init);
if (sem_inited != 1) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
__func__, sem);
errno = ENOMEM;
return -1;
}
kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort();
}
#else // SEM_USE_PTHREAD
p->sem = dispatch_semaphore_create(value);
if (p->sem == NULL) {
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
#endif // SEM_USE_PTHREAD
p->valid = 1;
*sem = p;
return 0;
}
int tsem_wait(tsem_t *sem) {
if (!*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
struct tsem_s *p = *sem;
if (!p->valid) {
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
#ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
p->val -= 1;
if (p->val < 0) {
if (taosThreadCondWait(&p->cond, &p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
result = pthread_cond_init(&pnewsem->count_bump, NULL);
if (result)
{
pthread_mutex_destroy(&pnewsem->count_lock);
free(pnewsem);
return result;
}
}
if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
return 0;
#elif defined(SEM_USE_POSIX)
return sem_wait(p->sem);
#elif defined(SEM_USE_SEM)
return semaphore_wait(p->sem);
#else // SEM_USE_PTHREAD
return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
#endif // SEM_USE_PTHREAD
}
int tsem_post(tsem_t *sem) {
if (!*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
struct tsem_s *p = *sem;
if (!p->valid) {
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
abort();
}
#ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
p->val += 1;
if (p->val <= 0) {
if (taosThreadCondSignal(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
}
if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
return 0;
#elif defined(SEM_USE_POSIX)
return sem_post(p->sem);
#elif defined(SEM_USE_SEM)
return semaphore_signal(p->sem);
#else // SEM_USE_PTHREAD
return dispatch_semaphore_signal(p->sem);
#endif // SEM_USE_PTHREAD
}
int tsem_destroy(tsem_t *sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
pnewsem->count = count;
*psem = (tsem_t)pnewsem;
return 0;
}
struct tsem_s *p = *sem;
if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort();
}
int tsem_destroy(tsem_t *psem)
{
bosal_sem_t *poldsem;
if (! psem)
{
return EINVAL;
}
poldsem = (bosal_sem_t *)*psem;
pthread_mutex_destroy(&poldsem->count_lock);
pthread_cond_destroy(&poldsem->count_bump);
free(poldsem);
return 0;
}
#ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
p->valid = 0;
if (taosThreadCondDestroy(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
if (taosThreadMutexDestroy(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort();
}
#elif defined(SEM_USE_POSIX)
char name[NAME_MAX - 4];
snprintf(name, sizeof(name), "/t%ld", p->id);
int r = sem_unlink(name);
if (r) {
int e = errno;
fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
e, strerror(e));
abort();
}
#elif defined(SEM_USE_SEM)
semaphore_destroy(sem_port, p->sem);
#else // SEM_USE_PTHREAD
#endif // SEM_USE_PTHREAD
p->valid = 0;
taosMemoryFree(p);
*sem = NULL;
return 0;
}
bool taosCheckPthreadValid(TdThread thread) {
uint64_t id = 0;
int r = TdThreadhreadid_np(thread, &id);
return r ? false : true;
int tsem_post(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
pxsem->count = pxsem->count + 1;
xresult = pthread_cond_signal(&pxsem->count_bump);
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
}
int tsem_trywait(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
if (pxsem->count > 0)
{
pxsem->count--;
}
else
{
xresult = EAGAIN;
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
}
int tsem_wait(tsem_t *psem)
{
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
if (pxsem->count == 0)
{
xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock);
}
if (! xresult)
{
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
}
int tsem_timewait(tsem_t *psem, int64_t nanosecs)
{
struct timespec abstim = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
bosal_sem_t *pxsem;
int result, xresult;
if (! psem)
{
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result)
{
return result;
}
xresult = 0;
if (pxsem->count == 0)
{
xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim);
}
if (! xresult)
{
if (pxsem->count > 0)
{
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result)
{
return result;
}
if (xresult)
{
errno = xresult;
return -1;
}
return 0;
}
bool taosCheckPthreadValid(TdThread thread) {
int32_t ret = taosThreadKill(thread, 0);
if (ret == ESRCH) return false;
if (ret == EINVAL) return false;
// alive
return true;
}
int64_t taosGetSelfPthreadId() {
uint64_t id;
TdThreadhreadid_np(0, &id);
return (int64_t)id;
TdThread thread = taosThreadSelf();
return (int64_t)thread;
}
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }

View File

@ -73,6 +73,10 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); }
void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); }
void taosKillChildOnParentStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); }
void taosKillChildOnParentStopped() {
#ifndef _TD_DARWIN_64
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
}
#endif

View File

@ -49,6 +49,14 @@
#define INVALID_SOCKET -1
#endif
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
#endif
int refId;
SocketFd fd;
} * TdSocketPtr, TdSocket;
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
@ -1029,60 +1037,6 @@ int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *a
return getsockname(pSocket->fd, destAddr, addrLen);
}
TdEpollPtr taosCreateEpoll(int32_t size) {
EpollFd fd = -1;
#ifdef WINDOWS
assert(0);
#else
fd = epoll_create(size);
#endif
if (fd < 0) {
return NULL;
}
TdEpollPtr pEpoll = (TdEpollPtr)taosMemoryMalloc(sizeof(TdEpoll));
if (pEpoll == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pEpoll->fd = fd;
pEpoll->refId = 0;
return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
int32_t code = -1;
if (pEpoll == NULL || pEpoll->fd < 0) {
return -1;
}
#ifdef WINDOWS
assert(0);
#else
code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
int32_t code = -1;
if (pEpoll == NULL || pEpoll->fd < 0) {
return -1;
}
#ifdef WINDOWS
assert(0);
#else
code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
#endif
return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
int32_t code;
if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
return -1;
}
code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
(*ppEpoll)->fd = -1;
taosMemoryFree(*ppEpoll);
return code;
}
/*
* Set TCP connection timeout per-socket level.
* ref [https://github.com/libuv/help/issues/54]
@ -1100,6 +1054,11 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
return -1;
}
#elif defined(_TD_DARWIN_64)
uint32_t conn_timeout_ms = timeout * 1000;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
return -1;
}
#else // Linux like systems
uint32_t conn_timeout_ms = timeout * 1000;
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {

View File

@ -157,9 +157,9 @@ int32_t taosThreadKill(TdThread thread, int32_t sig) {
return pthread_kill(thread, sig);
}
int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
return pthread_mutex_consistent(mutex);
}
// int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
// return pthread_mutex_consistent(mutex);
// }
int32_t taosThreadMutexDestroy(TdThreadMutex * mutex) {
return pthread_mutex_destroy(mutex);
@ -173,9 +173,9 @@ int32_t taosThreadMutexLock(TdThreadMutex * mutex) {
return pthread_mutex_lock(mutex);
}
int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
return pthread_mutex_timedlock(mutex, abstime);
}
// int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
// return pthread_mutex_timedlock(mutex, abstime);
// }
int32_t taosThreadMutexTryLock(TdThreadMutex * mutex) {
return pthread_mutex_trylock(mutex);
@ -193,9 +193,9 @@ int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr * attr, int32_t *p
return pthread_mutexattr_getpshared(attr, pshared);
}
int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust) {
return pthread_mutexattr_getrobust(attr, robust);
}
// int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust) {
// return pthread_mutexattr_getrobust(attr, robust);
// }
int32_t taosThreadMutexAttrGetType(const TdThreadMutexAttr * attr, int32_t *kind) {
return pthread_mutexattr_gettype(attr, kind);
@ -209,9 +209,9 @@ int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr * attr, int32_t pshared)
return pthread_mutexattr_setpshared(attr, pshared);
}
int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust) {
return pthread_mutexattr_setrobust(attr, robust);
}
// int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust) {
// return pthread_mutexattr_setrobust(attr, robust);
// }
int32_t taosThreadMutexAttrSetType(TdThreadMutexAttr * attr, int32_t kind) {
return pthread_mutexattr_settype(attr, kind);
@ -233,13 +233,13 @@ int32_t taosThreadRwlockRdlock(TdThreadRwlock * rwlock) {
return pthread_rwlock_rdlock(rwlock);
}
int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
return pthread_rwlock_timedrdlock(rwlock, abstime);
}
// int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
// return pthread_rwlock_timedrdlock(rwlock, abstime);
// }
int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
return pthread_rwlock_timedwrlock(rwlock, abstime);
}
// int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
// return pthread_rwlock_timedwrlock(rwlock, abstime);
// }
int32_t taosThreadRwlockTryRdlock(TdThreadRwlock * rwlock) {
return pthread_rwlock_tryrdlock(rwlock);
@ -303,7 +303,7 @@ int32_t taosThreadSpinDestroy(TdThreadSpinlock * lock) {
int32_t taosThreadSpinInit(TdThreadSpinlock * lock, int32_t pshared) {
#ifdef TD_USE_SPINLOCK_AS_MUTEX
assert(pshared == NULL);
assert(pshared == 0);
return pthread_mutex_init((pthread_mutex_t*)lock, NULL);
#else
return pthread_spin_init((pthread_spinlock_t*)lock, pshared);

View File

@ -417,8 +417,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if (pNode == NULL) {
pushfrontNodeInEntryList(pe, pNode1);
atomic_add_fetch_64(&pCacheObj->numOfElems, 1);
atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
atomic_add_fetch_ptr(&pCacheObj->numOfElems, 1);
atomic_add_fetch_ptr(&pCacheObj->sizeInBytes, pNode1->size);
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
@ -667,7 +667,7 @@ void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *p
pEntry->next = next;
pEntry->num -= 1;
atomic_sub_fetch_64(&pCacheObj->numOfElems, 1);
atomic_sub_fetch_ptr(&pCacheObj->numOfElems, 1);
pNode = next;
}
}

View File

@ -56,11 +56,11 @@ int32_t setChkNotInBytes8(const void *pLeft, const void *pRight) {
}
int32_t compareChkInString(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
return NULL != taosHashGet((SHashObj *)pRight, pLeft, varDataTLen(pLeft)) ? 1 : 0;
}
int32_t compareChkNotInString(const void *pLeft, const void *pRight) {
return NULL == taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
return NULL == taosHashGet((SHashObj *)pRight, pLeft, varDataTLen(pLeft)) ? 1 : 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {

View File

@ -132,7 +132,7 @@ static timer_map_t timerMap;
static uintptr_t getNextTimerId() {
uintptr_t id;
do {
id = (uintptr_t)atomic_add_fetch_ptr((void **)&nextTimerId, (void*)1);
id = (uintptr_t)atomic_add_fetch_ptr((void **)&nextTimerId, 1);
} while (id == 0);
return id;
}

View File

@ -20,18 +20,54 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
tmqCom.initConsumerTable()
tmqCom.create_database(tsql=tdSql, dbName=paraDict["dbName"],dropFlag=paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=paraDict['replica'])
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
@ -43,13 +79,6 @@ class TDTestCase:
topicNameList = ['topic1', 'topic2', 'topic3']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 4 == 0" %(paraDict['dbName'], paraDict['stbName'])
@ -134,16 +163,18 @@ class TDTestCase:
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'db2',
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
@ -155,13 +186,6 @@ class TDTestCase:
topicNameList = ['topic1', 'topic2', 'topic3']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'])
tdLog.info("insert data")
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
# sqlString = "create topic %s as select ts, sin(c1), pow(c2,3) from %s.%s where c2 >= 0" %(topicNameList[0], paraDict['dbName'], paraDict['stbName'])
@ -247,6 +271,7 @@ class TDTestCase:
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()

View File

@ -170,33 +170,42 @@ class TMQCom:
tdLog.debug("complete to create database %s"%(dbName))
return
# self.create_stable() and self.create_ctable() and self.insert_data_interlaceByMultiTbl() : The three functions are matched
# schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
def create_stable(self,tsql, dbName,stbName):
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 int, c2 int, c3 binary(16)) tags(t1 int, t2 binary(32))"%(dbName, stbName))
schemaString = "(ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))"
tsql.execute("create table if not exists %s.%s %s"%(dbName, stbName, schemaString))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
tsql.execute("use %s" %dbName)
# tsql.execute("use %s" %dbName)
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
batchNum = 10
tblBatched = 0
for i in range(ctbNum):
tagValue = 'beijing'
tagBinaryValue = 'beijing'
if (i % 2 == 0):
tagValue = 'shanghai'
tagBinaryValue = 'shanghai'
elif (i % 3 == 0):
tagValue = 'changsha'
tagBinaryValue = 'changsha'
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i+ctbStartIdx,stbName,i+ctbStartIdx+1, tagValue)
if (i > 0) and (i%100 == 0):
sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
tblBatched += 1
if (i == ctbNum-1 ) or (tblBatched == batchNum):
tsql.execute(sql)
tblBatched = 0
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
# schema: (ts timestamp, c1 int, c2 binary(16))
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
@ -208,11 +217,14 @@ class TMQCom:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(stbName,i)
else:
@ -224,6 +236,7 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
# schema: (ts timestamp, c1 int, c2 int, c3 binary(16))
def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
@ -234,14 +247,17 @@ class TMQCom:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
@ -253,6 +269,7 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
# schema: (ts timestamp, c1 int, c2 int, c3 binary(16), c4 timestamp)
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
@ -263,14 +280,17 @@ class TMQCom:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
rowsBatched += 1
if (rowsBatched == batchNum) or (j == rowsPerTbl - 1):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
else:
@ -282,7 +302,8 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
# schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
@ -297,15 +318,22 @@ class TMQCom:
ctbDict[i] = 0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb = 0
rowsOfCtb = 0
while rowsOfCtb < rowsPerTbl:
for i in range(ctbNum):
sql += " %s.%s_%d values "%(dbName,ctbPrefix,i)
sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx)
rowsBatched = 0
for k in range(batchNum):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + ctbDict[i], ctbDict[i], ctbDict[i])
if (k % 2 == 0):
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i], ctbDict[i],ctbDict[i], ctbDict[i],i+ctbStartIdx,k)
else:
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i],-ctbDict[i],ctbDict[i],-ctbDict[i],i+ctbStartIdx,k)
rowsBatched += 1
ctbDict[i] += 1
if (0 == ctbDict[i]%batchNum) or (ctbDict[i] == rowsPerTbl):
if (rowsBatched == batchNum) or (ctbDict[i] == rowsPerTbl):
tsql.execute(sql)
rowsBatched = 0
sql = "insert into "
break
rowsOfCtb = ctbDict[0]
@ -313,7 +341,18 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
def threadFunctionForInsertByInterlace(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.insert_data_interlaceByMultiTbl(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
return
def asyncInsertDataByInterlace(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForInsertByInterlace, kwargs=paraDict)
pThread.start()
return pThread
def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
tdLog.debug("start to insert data wiht auto create child table ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
@ -324,17 +363,17 @@ class TMQCom:
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
rowsBatched = 0
for i in range(ctbNum):
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i,dbName,stbName,i)
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsOfSql += 1
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsOfSql = 0
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i,dbName,stbName,i)
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
else:
sql = "insert into "
#end sql

View File

@ -26,7 +26,7 @@ class TDTestCase:
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',

View File

@ -26,7 +26,7 @@ class TDTestCase:
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== test case 1: ")
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',

View File

@ -118,8 +118,8 @@ python3 ./test.py -f 2-query/queryQnode.py
# python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
#python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
@ -155,3 +155,5 @@ python3 ./test.py -f 7-tmq/tmqUdf.py
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py

View File

@ -36,7 +36,6 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
int64_t now;
typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
@ -91,6 +90,7 @@ typedef struct {
int32_t consumeDelay; // unit s
int32_t numOfThread;
int32_t useSnapshot;
int64_t nowTime;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;
@ -199,6 +199,8 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.saveRowFlag = 0;
g_stConfInfo.consumeDelay = 5;
g_stConfInfo.nowTime = taosGetTimestampMs();
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
printHelp();
@ -511,10 +513,8 @@ static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
char sqlStr[1024] = {0};
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId,
pInfo->consumerId);
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
@ -591,7 +591,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0};
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName, atomic_fetch_add_64(&now, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
pInfo->consumeRowCnt, pInfo->checkresult);
char tmpString[128];
@ -855,8 +855,6 @@ int32_t getConsumeInfo() {
}
int main(int32_t argc, char* argv[]) {
now = taosGetTimestampMs();
parseArgument(argc, argv);
getConsumeInfo();
saveConfigToLogFile();

View File

@ -13,6 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef _TD_DARWIN_64
#include <pwd.h>
#endif
#include "shellInt.h"
#define SHELL_HOST "The auth string to use when connecting to the server."

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _BSD_SOURCE
#define _GNU_SOURCE
#define _XOPEN_SOURCE