Merge remote-tracking branch 'origin/develop' into feature/crash_gen

This commit is contained in:
Steven Li 2020-08-27 22:09:42 +00:00
commit 72af08d97a
71 changed files with 596 additions and 444 deletions

2
.gitignore vendored
View File

@ -66,6 +66,8 @@ CMakeError.log
/test/cfg /test/cfg
/src/.vs /src/.vs
*.o *.o
version.c
taos.rc
src/connector/jdbc/.settings/ src/connector/jdbc/.settings/
tests/comparisonTest/cassandra/cassandratest/.classpath tests/comparisonTest/cassandra/cassandratest/.classpath
tests/comparisonTest/cassandra/cassandratest/.project tests/comparisonTest/cassandra/cassandratest/.project

View File

@ -28,6 +28,7 @@ INCLUDE(cmake/input.inc)
INCLUDE(cmake/platform.inc) INCLUDE(cmake/platform.inc)
INCLUDE(cmake/define.inc) INCLUDE(cmake/define.inc)
INCLUDE(cmake/env.inc) INCLUDE(cmake/env.inc)
INCLUDE(cmake/version.inc)
INCLUDE(cmake/install.inc) INCLUDE(cmake/install.inc)
ADD_SUBDIRECTORY(deps) ADD_SUBDIRECTORY(deps)

42
cmake/version.inc Normal file
View File

@ -0,0 +1,42 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
SET(TD_VER_1 "2")
SET(TD_VER_2 "0")
SET(TD_VER_3 "2")
SET(TD_VER_4 "3")
SET(TD_VER_GIT "d711657139620f6c50f362597020705b8ad26bd2")
SET(TD_VER_GIT_INTERNAL "1d74ae24c541ffbb280e8630883c0236cd45f8c7")
SET(TD_VER_VERTYPE "stable")
SET(TD_VER_CPUTYPE "x64")
SET(TD_VER_OSTYPE "Linux")
SET(TD_VER_COMPATIBLE "2.0.0.0")
STRING(TIMESTAMP TD_VER_DATE "%Y-%m-%d %H:%M:%S")
IF (TD_LINUX_64)
SET(TD_VER_CPUTYPE "x64")
ENDIF ()
IF (TD_LINUX_32)
SET(TD_VER_CPUTYPE "x86")
ENDIF ()
IF (TD_ARM_64)
SET(TD_VER_CPUTYPE "aarch64")
ENDIF ()
IF (TD_ARM_32)
SET(TD_VER_CPUTYPE "aarch32")
ENDIF ()
IF (TD_WINDOWS_64)
SET(TD_VER_CPUTYPE "x64")
ENDIF ()
IF (TD_WINDOWS_32)
SET(TD_VER_CPUTYPE "x86")
ENDIF ()
CONFIGURE_FILE("${TD_COMMUNITY_DIR}/src/util/src/version.c.in" "${TD_COMMUNITY_DIR}/src/util/src/version.c")

View File

@ -4,5 +4,5 @@ PROJECT(TDengine)
IF (TD_WINDOWS) IF (TD_WINDOWS)
INCLUDE_DIRECTORIES(include) INCLUDE_DIRECTORIES(include)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(MsvcLibXw64 ${SRC}) ADD_LIBRARY(MsvcLibXw ${SRC})
ENDIF () ENDIF ()

View File

@ -12,6 +12,6 @@ ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib"
ENV LANG=en_US.UTF-8 ENV LANG=en_US.UTF-8
ENV LANGUAGE=en_US:en ENV LANGUAGE=en_US:en
ENV LC_ALL=en_US.UTF-8 ENV LC_ALL=en_US.UTF-8
EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042
CMD ["taosd"] CMD ["taosd"]
VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos/" ] VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos/" ]

View File

@ -10,6 +10,7 @@ set -e
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta] # -V [stable | beta]
# -l [full | lite] # -l [full | lite]
# -s [static | dynamic]
# -n [2.0.0.3] # -n [2.0.0.3]
# set parameters by default value # set parameters by default value
@ -18,9 +19,10 @@ verType=stable # [stable, beta]
cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite] pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic]
verNumber="" verNumber=""
while getopts "hv:V:c:o:l:n:" arg while getopts "hv:V:c:o:l:s:n:" arg
do do
case $arg in case $arg in
v) v)
@ -39,6 +41,10 @@ do
#echo "pagMode=$OPTARG" #echo "pagMode=$OPTARG"
pagMode=$(echo $OPTARG) pagMode=$(echo $OPTARG)
;; ;;
s)
#echo "soMode=$OPTARG"
soMode=$(echo $OPTARG)
;;
n) n)
#echo "verNumber=$OPTARG" #echo "verNumber=$OPTARG"
verNumber=$(echo $OPTARG) verNumber=$(echo $OPTARG)
@ -53,6 +59,7 @@ do
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
echo " -l [full | lite] " echo " -l [full | lite] "
echo " -s [static | dynamic] "
echo " -n [version number] " echo " -n [version number] "
exit 0 exit 0
;; ;;
@ -63,7 +70,7 @@ do
esac esac
done done
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} verNumber=${verNumber}" echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} verNumber=${verNumber}"
curr_dir=$(pwd) curr_dir=$(pwd)
@ -223,9 +230,9 @@ cd ${compile_dir}
# check support cpu type # check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then
if [ "$verMode" != "cluster" ]; then if [ "$verMode" != "cluster" ]; then
cmake ../ -DCPUTYPE=${cpuType} -DPAGMODE=${pagMode} -DOSTYPE=${osType} cmake ../ -DCPUTYPE=${cpuType} -DPAGMODE=${pagMode} -DOSTYPE=${osType} -DSOMODE=${soMode}
else else
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode}
fi fi
else else
echo "input cpuType=${cpuType} error!!!" echo "input cpuType=${cpuType} error!!!"

View File

@ -35,12 +35,14 @@ IF (TD_LINUX)
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32)
CONFIGURE_FILE("${TD_COMMUNITY_DIR}/src/client/src/taos.rc.in" "${TD_COMMUNITY_DIR}/src/client/src/taos.rc")
ADD_LIBRARY(taos_static STATIC ${SRC}) ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static trpc tutil query) TARGET_LINK_LIBRARIES(taos_static trpc tutil query)
# generate dynamic library (*.dll) # generate dynamic library (*.dll)
ADD_LIBRARY(taos SHARED ${SRC}) ADD_LIBRARY(taos SHARED ${SRC} ${TD_COMMUNITY_DIR}/src/client/src/taos.rc)
IF (NOT TD_GODLL) IF (NOT TD_GODLL)
SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def) SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def)
ENDIF () ENDIF ()

View File

@ -108,7 +108,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList); void* tscDestroyBlockArrayList(SArray* pDataBlockList);

View File

@ -234,7 +234,7 @@ typedef struct {
char * curSql; // current sql, resume position of sql after parsing paused char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished; int8_t parseFinished;
short numOfCols; int16_t numOfCols;
uint32_t allocSize; uint32_t allocSize;
char * payload; char * payload;
int32_t payloadLen; int32_t payloadLen;

31
src/client/src/taos.rc.in Normal file
View File

@ -0,0 +1,31 @@
1 VERSIONINFO
FILEVERSION ${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3}
PRODUCTVERSION ${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3}
FILEFLAGSMASK 0x17L
#ifdef _DEBUG
FILEFLAGS 0x1L
#else
FILEFLAGS 0x0L
#endif
FILEOS 0x4L
FILETYPE 0x0L
FILESUBTYPE 0x0L
BEGIN
BLOCK "StringFileInfo"
BEGIN
BLOCK "040904b0"
BEGIN
VALUE "FileDescription", "Native C Driver for TDengine"
VALUE "FileVersion", "${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3}"
VALUE "InternalName", "taos.dll(${TD_VER_CPUTYPE})"
VALUE "LegalCopyright", "Copyright (C) 2020 TAOS Data"
VALUE "OriginalFilename", ""
VALUE "ProductName", "taos.dll(${TD_VER_CPUTYPE})"
VALUE "ProductVersion", "${TD_VER_1}.${TD_VER_2}.${TD_VER_3}.${TD_VER_4}"
END
END
BLOCK "VarFileInfo"
BEGIN
VALUE "Translation", 0x409, 1200
END
END

View File

@ -285,9 +285,9 @@ void tscKillConnection(STscObj *pObj) {
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
//taosStopRpcConn(pSql->thandle);
pSql = pSql->next; pSql = pSql->next;
} }
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {

View File

@ -226,13 +226,17 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = &pSql->pRpcCtx, .handle = &pSql->pRpcCtx,
.code = 0 .code = 0
}; };
// NOTE: the rpc context should be acquired before sending data to server. // NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); if (pObj != NULL && pObj->signature == pObj) {
return TSDB_CODE_SUCCESS; rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
} else {
//pObj->signature has been reset by other thread, ignore concurrency problem
return TSDB_CODE_TSC_CONN_KILLED;
}
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {

View File

@ -201,7 +201,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
} }
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
tscDebug("try to create a connection to %s:%u, user:%s db:%s", ip, port, user, db); tscDebug("try to create a connection to %s:%u, user:%s db:%s", ip, port != 0 ? port : tsServerPort , user, db);
if (user == NULL) user = TSDB_DEFAULT_USER; if (user == NULL) user = TSDB_DEFAULT_USER;
if (pass == NULL) pass = TSDB_DEFAULT_PASS; if (pass == NULL) pass = TSDB_DEFAULT_PASS;

View File

@ -404,7 +404,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
taosTFree(pDataBlock); taosTFree(pDataBlock);
} }
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset) { uint32_t offset) {
uint32_t needed = pDataBlock->numOfParams + 1; uint32_t needed = pDataBlock->numOfParams + 1;
if (needed > pDataBlock->numOfAllocedParams) { if (needed > pDataBlock->numOfAllocedParams) {

View File

@ -29,6 +29,7 @@ extern uint16_t tsServerPort;
extern uint16_t tsDnodeShellPort; extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeDnodePort; extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;
extern uint16_t tsArbitratorPort;
extern int32_t tsStatusInterval; extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes; extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak; extern int32_t tsEnableVnodeBak;

View File

@ -37,6 +37,7 @@ uint16_t tsServerPort = 6030;
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
uint16_t tsDnodeDnodePort = 6035; // udp/tcp uint16_t tsDnodeDnodePort = 6035; // udp/tcp
uint16_t tsSyncPort = 6040; uint16_t tsSyncPort = 6040;
uint16_t tsArbitratorPort = 6042;
int32_t tsStatusInterval = 1; // second int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 3; int32_t tsNumOfMnodes = 3;
int32_t tsEnableVnodeBak = 1; int32_t tsEnableVnodeBak = 1;
@ -1331,7 +1332,10 @@ int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
*port = atoi(temp+1); *port = atoi(temp+1);
} }
if (*port == 0) *port = tsServerPort; if (*port == 0) {
*port = tsServerPort;
return -1;
}
return 0; return 0;
} }

View File

@ -119,11 +119,8 @@ int32_t main(int32_t argc, char *argv[]) {
syslog(LOG_INFO, "Started TDengine service successfully."); syslog(LOG_INFO, "Started TDengine service successfully.");
for (int res = tsem_wait(&exitSem); res != 0; res = tsem_wait(&exitSem)) { if (tsem_wait(&exitSem) != 0) {
if (res != EINTR) { syslog(LOG_ERR, "failed to wait exit semphore: %s", strerror(errno));
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
break;
}
} }
dnodeCleanUpSystem(); dnodeCleanUpSystem();

View File

@ -93,7 +93,7 @@ DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision
DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *tres); DLL_EXPORT int taos_field_count(TAOS_RES *tres);
DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *taos); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);

View File

@ -63,7 +63,7 @@ typedef struct tstr {
extern const int32_t TYPE_BYTES[11]; extern const int32_t TYPE_BYTES[11];
// TODO: replace and remove code below // TODO: replace and remove code below
#define CHAR_BYTES sizeof(char) #define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(short) #define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int) #define INT_BYTES sizeof(int)
#define LONG_BYTES sizeof(int64_t) #define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float) #define FLOAT_BYTES sizeof(float)

View File

@ -66,6 +66,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid me
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, 0, 0x0015, "Unable to resolve FQDN")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported") TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported")
@ -96,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_APP_ERROR, 0, 0x0211, "Applicatio
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
// mnode // mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")

View File

@ -424,7 +424,10 @@ typedef struct SColumnInfo {
int16_t type; int16_t type;
int16_t bytes; int16_t bytes;
int16_t numOfFilters; int16_t numOfFilters;
SColumnFilterInfo *filters; union{
int64_t placeholder;
SColumnFilterInfo *filters;
};
} SColumnInfo; } SColumnInfo;
typedef struct STableIdInfo { typedef struct STableIdInfo {

View File

@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {

View File

@ -64,7 +64,7 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/ */
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file

View File

@ -27,35 +27,46 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <wordexp.h>
#define MAX_PKG_LEN (64*1000) #define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024) #define BUFFER_SIZE (MAX_PKG_LEN + 1024)
#define TEST_FQDN_LEN 128
#define TEST_IPv4ADDR_LEN 16
typedef struct { typedef struct {
int port; uint16_t port;
char *host; uint32_t hostIp;
char fqdn[TEST_FQDN_LEN];
uint16_t pktLen; uint16_t pktLen;
} info_s; } info_s;
typedef struct Arguments { typedef struct Arguments {
char * host; char host[TEST_IPv4ADDR_LEN];
char fqdn[TEST_FQDN_LEN];
uint16_t port; uint16_t port;
uint16_t max_port; uint16_t max_port;
uint16_t pktLen; uint16_t pktLen;
} SArguments; } SArguments;
static struct argp_option options[] = { static struct argp_option options[] = {
{0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'h', "host ip", 0, "The host ip to connect to TDEngine. Default is localhost.", 0},
{0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1},
{0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6042.", 2},
{0, 'f', "host fqdn", 0, "The host fqdn to connect to TDEngine.", 3},
{0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state) {
wordexp_t full_path;
SArguments *arguments = state->input; SArguments *arguments = state->input;
switch (key) { switch (key) {
case 'h': case 'h':
arguments->host = arg; if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid host ip %s\n", arg);
return -1;
}
strcpy(arguments->host, full_path.we_wordv[0]);
wordfree(&full_path);
break; break;
case 'p': case 'p':
arguments->port = atoi(arg); arguments->port = atoi(arg);
@ -66,6 +77,14 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'l': case 'l':
arguments->pktLen = atoi(arg); arguments->pktLen = atoi(arg);
break; break;
case 'f':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid host fqdn %s\n", arg);
return -1;
}
strcpy(arguments->fqdn, full_path.we_wordv[0]);
wordfree(&full_path);
break;
default: default:
return ARGP_ERR_UNKNOWN; return ARGP_ERR_UNKNOWN;
@ -76,8 +95,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, 0, 0}; static struct argp argp = {options, parse_opt, 0, 0};
int checkTcpPort(info_s *info) { int checkTcpPort(info_s *info) {
int port = info->port;
char *host = info->host;
int clientSocket; int clientSocket;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
@ -88,21 +105,35 @@ int checkTcpPort(info_s *info) {
printf("socket() fail: %s\n", strerror(errno)); printf("socket() fail: %s\n", strerror(errno));
return -1; return -1;
} }
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(host); // set send and recv overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
//printf("=================================\n"); //printf("=================================\n");
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
printf("connect() fail: %s\n", strerror(errno)); printf("connect() fail: %s\t", strerror(errno));
return -1; return -1;
} }
//printf("Connect to: %s:%d...success\n", host, port); //printf("Connect to: %s:%d...success\n", host, port);
memset(sendbuf, 0, BUFFER_SIZE); memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", host, port); struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
send(clientSocket, sendbuf, info->pktLen, 0); send(clientSocket, sendbuf, info->pktLen, 0);
@ -120,7 +151,7 @@ int checkTcpPort(info_s *info) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
} else { } else {
printf("recv ack pkg from TCP port: %d fail:%s.\n", port, strerror(errno)); printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno));
close(clientSocket); close(clientSocket);
return -1; return -1;
} }
@ -132,7 +163,7 @@ int checkTcpPort(info_s *info) {
} }
if (iDataNum < info->pktLen) { if (iDataNum < info->pktLen) {
printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port); printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port);
return -1; return -1;
} }
//printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
@ -142,8 +173,6 @@ int checkTcpPort(info_s *info) {
} }
int checkUdpPort(info_s *info) { int checkUdpPort(info_s *info) {
int port = info->port;
char *host = info->host;
int clientSocket; int clientSocket;
struct sockaddr_in serverAddr; struct sockaddr_in serverAddr;
@ -154,15 +183,28 @@ int checkUdpPort(info_s *info) {
perror("socket"); perror("socket");
return -1; return -1;
} }
// set overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
}
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port); serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = inet_addr(host); serverAddr.sin_addr.s_addr = info->hostIp;
memset(sendbuf, 0, BUFFER_SIZE); memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE);
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", host, port); struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
@ -176,7 +218,7 @@ int checkUdpPort(info_s *info) {
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < info->pktLen) { if (iDataNum < info->pktLen) {
printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\n", iDataNum, info->pktLen, port); printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port);
return -1; return -1;
} }
@ -185,10 +227,61 @@ int checkUdpPort(info_s *info) {
return 0; return 0;
} }
int main(int argc, char *argv[]) { int32_t getIpFromFqdn(const char *fqdn, uint32_t* ip) {
SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; struct addrinfo hints = {0};
info_s info; hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result = NULL;
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in*)sa;
struct in_addr ia = si->sin_addr;
*ip = ia.s_addr;
freeaddrinfo(result);
return 0;
} else {
printf("Failed get the ip address from fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret));
return -1;
}
}
void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) {
int ret; int ret;
info_s info;
memset(&info, 0, sizeof(info_s));
info.hostIp = hostIp;
info.pktLen = pktLen;
for (uint16_t port = startPort; port <= maxPort; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
info.port = port;
ret = checkTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.\t\n", port);
} else {
printf("tcp port:%d test ok.\t\t", port);
}
ret = checkUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.\t\n", port);
} else {
printf("udp port:%d test ok.\t\t", port);
}
}
printf("\n");
return ;
}
int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", "", 6030, 6042, 1000};
int ret;
argp_parse(&argp, argc, argv, 0, 0, &arguments); argp_parse(&argp, argc, argv, 0, 0, &arguments);
if (arguments.pktLen > MAX_PKG_LEN) { if (arguments.pktLen > MAX_PKG_LEN) {
@ -196,32 +289,25 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
printf("host: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.port, arguments.max_port, arguments.pktLen); printf("host ip: %s\thost fqdn: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.fqdn, arguments.port, arguments.max_port, arguments.pktLen);
int port = arguments.port; if (arguments.host[0] != 0) {
printf("\nstart connect to %s test:\n", arguments.host);
info.host = arguments.host; checkPort(inet_addr(arguments.host), arguments.port, arguments.max_port, arguments.pktLen);
info.pktLen = arguments.pktLen;
for (; port <= arguments.max_port; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n"); printf("\n");
info.port = port;
ret = checkTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.\t\t", port);
} else {
printf("tcp port:%d test ok.\t\t", port);
}
ret = checkUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.\t\t", port);
} else {
printf("udp port:%d test ok.\t\t", port);
}
} }
printf("\n");
if (arguments.fqdn[0] != 0) {
uint32_t hostIp = 0;
ret = getIpFromFqdn(arguments.fqdn, &hostIp);
if (ret) {
printf("\n");
return 0;
}
printf("\nstart connetc to %s test:\n", arguments.fqdn);
checkPort(hostIp, arguments.port, arguments.max_port, arguments.pktLen);
printf("\n");
}
return 0; return 0;
} }

View File

@ -142,9 +142,9 @@ static void *bindTcpPort(void *sarg) {
printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum);
if (iDataNum > 0) { if (iDataNum > 0) {
send(client, buffer, iDataNum, 0); send(client, buffer, iDataNum, 0);
break;
} }
} }
close(serverSocket); close(serverSocket);
return NULL; return NULL;
} }
@ -201,7 +201,7 @@ static void *bindUdpPort(void *sarg) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; SArguments arguments = {"127.0.0.1", 6030, 6042, 1000};
argp_parse(&argp, argc, argv, 0, 0, &arguments); argp_parse(&argp, argc, argv, 0, 0, &arguments);
if (arguments.pktLen > MAX_PKG_LEN) { if (arguments.pktLen > MAX_PKG_LEN) {
printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN);

View File

@ -354,7 +354,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, SMnodeMsg
mDebug("db:%s, already exist, ignore exist is set", pCreate->db); mDebug("db:%s, already exist, ignore exist is set", pCreate->db);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
mError("db:%s, is already exist, ignore exist not set", pCreate->db); mError("db:%s, already exist, ignore exist not set", pCreate->db);
return TSDB_CODE_MND_DB_ALREADY_EXIST; return TSDB_CODE_MND_DB_ALREADY_EXIST;
} }
} }

View File

@ -518,7 +518,7 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
SDnodeObj *pDnode = mnodeGetDnodeByEp(ep); SDnodeObj *pDnode = mnodeGetDnodeByEp(ep);
if (pDnode != NULL) { if (pDnode != NULL) {
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
mError("dnode:%d is already exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); mError("dnode:%d, already exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
return TSDB_CODE_MND_DNODE_ALREADY_EXIST; return TSDB_CODE_MND_DNODE_ALREADY_EXIST;
} }

View File

@ -100,7 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
}; };
tstrncpy(connObj.user, user, sizeof(connObj.user)); tstrncpy(connObj.user, user, sizeof(connObj.user));
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME); SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn; return pConn;

View File

@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
} }
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
return 0; return 0;
} }

View File

@ -23,7 +23,7 @@ extern "C" {
#ifndef TAOS_OS_FUNC_SEMPHONE #ifndef TAOS_OS_FUNC_SEMPHONE
#define tsem_t sem_t #define tsem_t sem_t
#define tsem_init sem_init #define tsem_init sem_init
#define tsem_wait sem_wait int tsem_wait(tsem_t* sem);
#define tsem_post sem_post #define tsem_post sem_post
#define tsem_destroy sem_destroy #define tsem_destroy sem_destroy
#endif #endif

View File

@ -16,6 +16,18 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#ifndef TAOS_OS_FUNC_SEMPHONE
int tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
#endif
#ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD #ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }

View File

@ -4,4 +4,4 @@ PROJECT(TDengine)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
ADD_LIBRARY(os ${SRC}) ADD_LIBRARY(os ${SRC})
TARGET_LINK_LIBRARIES(os winmm IPHLPAPI ws2_32 MsvcLibXw64) TARGET_LINK_LIBRARIES(os winmm IPHLPAPI ws2_32 MsvcLibXw)

View File

@ -563,7 +563,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
if (peerIp == 0xFFFFFFFF) { if (peerIp == 0xFFFFFFFF) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_RPC_APP_ERROR; terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL; return NULL;
} }

View File

@ -74,7 +74,7 @@ typedef struct {
uint32_t magic; uint32_t magic;
uint32_t index; uint32_t index;
uint64_t fversion; uint64_t fversion;
int32_t size; int64_t size;
} SFileInfo; } SFileInfo;
typedef struct { typedef struct {

View File

@ -26,7 +26,7 @@ typedef void* tthread_h;
typedef struct { typedef struct {
int numOfThreads; int numOfThreads;
uint32_t serverIp; uint32_t serverIp;
short port; int16_t port;
int bufferSize; int bufferSize;
void (*processBrokenLink)(void *ahandle); void (*processBrokenLink)(void *ahandle);
int (*processIncomingMsg)(void *ahandle, void *buffer); int (*processIncomingMsg)(void *ahandle, void *buffer);

View File

@ -108,8 +108,7 @@ static void syncModuleInitFunc() {
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
} }
void *syncStart(const SSyncInfo *pInfo) void *syncStart(const SSyncInfo *pInfo) {
{
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1);
@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo)
return pNode; return pNode;
} }
void syncStop(void *param) void syncStop(void *param) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
if (pNode == NULL) return; if (pNode == NULL) return;
@ -215,9 +213,8 @@ void syncStop(void *param)
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
int i, j; int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param; SSyncPeer * pPeer;
SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead *pWalHead = data; SWalHead *pWalHead = data;
int fwdLen; int fwdLen;
@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) void syncConfirmForward(void *param, uint64_t version, int32_t code) {
{ SSyncNode *pNode = param;
SSyncNode *pNode = param;
if (pNode == NULL) return; if (pNode == NULL) return;
if (pNode->quorum <= 1) return; if (pNode->quorum <= 1) return;
@ -387,10 +382,9 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i=0; i<pNode->replica; ++i) { for (int i=0; i<pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
return 0; return 0;
} }
static void syncAddArbitrator(SSyncNode *pNode) static void syncAddArbitrator(SSyncNode *pNode) {
{
SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
// if not configured, return right away // if not configured, return right away
@ -413,9 +406,11 @@ static void syncAddArbitrator(SSyncNode *pNode)
SNodeInfo nodeInfo; SNodeInfo nodeInfo;
nodeInfo.nodeId = 0; nodeInfo.nodeId = 0;
taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); int ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort);
nodeInfo.nodePort += TSDB_PORT_SYNC; if (-1 == ret) {
nodeInfo.nodePort = tsArbitratorPort;
}
if (pPeer) { if (pPeer) {
if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) { if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) {
return; return;
@ -454,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode)
} }
} }
void syncAddPeerRef(SSyncPeer *pPeer) void syncAddPeerRef(SSyncPeer *pPeer) {
{
atomic_add_fetch_8(&pPeer->refCount, 1); atomic_add_fetch_8(&pPeer->refCount, 1);
} }
int syncDecPeerRef(SSyncPeer *pPeer) int syncDecPeerRef(SSyncPeer *pPeer) {
{
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode); syncDecNodeRef(pPeer->pSyncNode);
@ -473,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer)
return 1; return 1;
} }
static void syncClosePeerConn(SSyncPeer *pPeer) static void syncClosePeerConn(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >=0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); taosFreeTcpConn(pPeer->pConn);
} }
} }
static void syncRemovePeer(SSyncPeer *pPeer) static void syncRemovePeer(SSyncPeer *pPeer) {
{
sInfo("%s, it is removed", pPeer->id); sInfo("%s, it is removed", pPeer->id);
pPeer->ip = 0; pPeer->ip = 0;
@ -492,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer)
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
} }
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
{
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL; if (ip == -1) return NULL;
@ -523,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
return pPeer; return pPeer;
} }
void syncBroadcastStatus(SSyncNode *pNode) void syncBroadcastStatus(SSyncNode *pNode) {
{
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue; if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1);
} }
} }
static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pNode->peerInfo[i]->numOfRetrieves = 0; pNode->peerInfo[i]->numOfRetrieves = 0;
} }
if (pNode->notifyFlowCtrl) if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0); (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
}
} }
static void syncChooseMaster(SSyncNode *pNode) { static void syncChooseMaster(SSyncNode *pNode) {
@ -598,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
} else { } else {
sDebug("vgId:%d, failed to choose master", pNode->vgId); sDebug("vgId:%d, failed to choose master", pNode->vgId);
} }
} }
static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int onlineNum = 0; int onlineNum = 0;
int index = -1; int index = -1;
int replica = pNode->replica; int replica = pNode->replica;
@ -617,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
replica = pNode->replica + 1; replica = pNode->replica + 1;
} }
if (onlineNum <= replica*0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
pNode->peerInfo[pNode->selfIndex]->role = nodeRole; pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
@ -625,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
for (int i=0; i<pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[i];
if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
if ( index < 0 ) { if (index < 0) {
index = i; index = i;
} else { // multiple masters, it shall not happen } else { // multiple masters, it shall not happen
if ( i == pNode->selfIndex ) { if (i == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id); sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
@ -640,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
} }
} }
SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL;
return pMaster; return pMaster;
} }
@ -649,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
int code = 0; int code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1; code = -1;
@ -658,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
if ( i == pNode->selfIndex ) continue; if ( i == pNode->selfIndex ) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[i]);
} }
} }
return code; return code;
} }
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int8_t peerOldRole = pPeer->role; int8_t peerOldRole = pPeer->role;
int8_t selfOldRole = nodeRole; int8_t selfOldRole = nodeRole;
@ -686,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if (syncValidateMaster(pPeer) < 0) return; if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if ( nodeVersion < pMaster->version) { if (nodeVersion < pMaster->version) {
syncRequired = 1; syncRequired = 1;
} else { } else {
sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
} else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
// nodeVersion = pMaster->version; // nodeVersion = pMaster->version;
} }
} else { } else {
@ -734,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort))
taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
} }
void syncRestartConnection(SSyncPeer *pPeer) void syncRestartConnection(SSyncPeer *pPeer) {
{
if (pPeer->ip == 0) return; if (pPeer->ip == 0) return;
syncRestartPeer(pPeer); syncRestartPeer(pPeer);
syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE);
} }
static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, sync-req is received", pPeer->id); sDebug("%s, sync-req is received", pPeer->id);
@ -782,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
} }
} }
static void syncNotStarted(void *param, void *tmrId) static void syncNotStarted(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
@ -803,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncRecoverFromMaster(SSyncPeer *pPeer) static void syncRecoverFromMaster(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus);
return; return;
} }
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
if (tsSyncNum >= tsMaxSyncNum) { if (tsSyncNum >= tsMaxSyncNum) {
@ -840,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
return; return;
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo *pFwdInfo; SFwdInfo *pFwdInfo;
@ -862,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
} }
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) SSyncNode * pNode = pPeer->pSyncNode;
{
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *)cont; SWalHead *pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
@ -884,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
return; return;
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
@ -909,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
} }
// head.len = htonl(head.len); // head.len = htonl(head.len);
if (pHead->len <0) { if (pHead->len < 0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
return -1; return -1;
} }
int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
@ -923,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return 0; return 0;
} }
static int syncProcessPeerMsg(void *param, void *buffer) static int syncProcessPeerMsg(void *param, void *buffer) {
{ SSyncPeer * pPeer = param;
SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead head;
char *cont = (char *)buffer; char *cont = (char *)buffer;
@ -953,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer)
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0}; char msg[statusMsgLen] = {0};
@ -1011,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id); sDebug("%s, connection to peer server is setup", pPeer->id);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
@ -1024,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} }
} }
static void syncCheckPeerConnection(void *param, void *tmrId) static void syncCheckPeerConnection(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
@ -1037,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId)
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
pthread_attr_t thattr; pthread_attr_t thattr;
@ -1059,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
} }
} }
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
{
char ipstr[24]; char ipstr[24];
int i; int i;
@ -1137,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) {
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
@ -1160,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
} }
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds; int fwds = pSyncFwds->fwds;
@ -1178,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
} }
} }
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) {
{
int confirm = 0; int confirm = 0;
if (pFwdInfo->code == 0) pFwdInfo->code = code; if (pFwdInfo->code == 0) pFwdInfo->code = code;
@ -1200,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) static void syncMonitorFwdInfos(void *param, void *tmrId) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
@ -1220,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId)
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
} }

View File

@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
char fname[TSDB_FILENAME_LEN*3] = {0}; char fname[TSDB_FILENAME_LEN*3] = {0};
uint32_t magic; uint32_t magic;
uint64_t fversion; uint64_t fversion;
int32_t size; int64_t size;
uint32_t index = sindex; uint32_t index = sindex;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
} }
} }
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
close(dfd); close(dfd);
if (ret<0) break; if (ret<0) break;
sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size); sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
} }
@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
return code; return code;
} }
static int syncRestoreWal(SSyncPeer *pPeer) static int syncRestoreWal(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1; int ret, code = -1;
@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
return offset; return offset;
} }
static int syncProcessBufferedFwd(SSyncPeer *pPeer) static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0; int forwards = 0;
@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer)
return pRecv->code; return pRecv->code;
} }
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
return pRecv->code; return pRecv->code;
} }
static void syncCloseRecvBuffer(SSyncNode *pNode) static void syncCloseRecvBuffer(SSyncNode *pNode) {
{
if (pNode->pRecv) { if (pNode->pRecv) {
taosTFree(pNode->pRecv->buffer); taosTFree(pNode->pRecv->buffer);
} }
@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode)
taosTFree(pNode->pRecv); taosTFree(pNode->pRecv);
} }
static int syncOpenRecvBuffer(SSyncNode *pNode) static int syncOpenRecvBuffer(SSyncNode *pNode) {
{
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode)
return 0; return 0;
} }
static int syncRestoreDataStepByStep(SSyncPeer *pPeer) static int syncRestoreDataStepByStep(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRestoreData(void *param) void *syncRestoreData(void *param) {
{ SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); __sync_fetch_and_add(&tsSyncNum, 1);
@ -326,4 +318,3 @@ void *syncRestoreData(void *param)
return NULL; return NULL;
} }

View File

@ -27,11 +27,10 @@
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
{
sDebug("%s, start to monitor:%s", pPeer->id, name); sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <=0) { if (pPeer->notifyFd <= 0) {
pPeer->watchNum = 0; pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) { if (pPeer->notifyFd < 0) {
@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
return 0; return 0;
} }
static int syncAreFilesModified(SSyncPeer *pPeer) static int syncAreFilesModified(SSyncPeer *pPeer) {
{ if (pPeer->notifyFd <= 0) return 0;
if (pPeer->notifyFd <=0) return 0;
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
} }
} }
return code; return code;
} }
static int syncRetrieveFile(SSyncPeer *pPeer) static int syncRetrieveFile(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; SFileInfo fileInfo;
SFileAck fileAck; SFileAck fileAck;
int code = -1; int code = -1;
@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break; if (ret < 0) break;
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// send the file to peer // send the file to peer
int sfd = open(name, O_RDONLY); int sfd = open(name, O_RDONLY);
if ( sfd < 0 ) break; if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret <0) break; if (ret < 0) break;
sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size); sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
fileInfo.index++; fileInfo.index++;
// check if processed files are modified // check if processed files are modified
@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
/* if only a partial record is read out, set the IN_MODIFY flag in event, /* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */ so upper layer will reload the file to get a complete record */
static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
{
int ret; int ret;
ret = read(sfd, pHead, sizeof(SWalHead)); ret = read(sfd, pHead, sizeof(SWalHead));
@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
ret = read(sfd, pHead->cont, pHead->len); ret = read(sfd, pHead->cont, pHead->len);
if (ret <0) return -1; if (ret < 0) return -1;
if (ret != pHead->len) { if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded // file is not at end yet, it shall be reloaded
@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
{
pPeer->watchNum = 0; pPeer->watchNum = 0;
taosClose(pPeer->notifyFd); taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
return -1; return -1;
} }
return 0; return 0;
} }
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
{
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len <0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
} }
if (len == 0) return 0; if (len == 0) return 0;
struct inotify_event *event; struct inotify_event *event;
@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
return 0; return 0;
} }
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
{
SWalHead *pHead = (SWalHead *) malloc(640000); SWalHead *pHead = (SWalHead *) malloc(640000);
int code = -1; int code = -1;
int32_t bytes = 0; int32_t bytes = 0;
@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); int wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize <0) break; if (wsize < 0) break;
if (wsize == 0) { code = 0; break; } if (wsize == 0) {
code = 0;
break;
}
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
} }
if (code < 0) break; if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break; if (pPeer->sversion >= fversion && fversion > 0) break;
index++; wname[0] = 0; index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if ( code < 0) break; if (code < 0) break;
if ( wname[0] == 0 ) {code = 0; break;} if (wname[0] == 0) {
code = 0;
break;
}
// current last wal is closed, there is a new one // current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id); sDebug("%s, last wal is closed, try new one", pPeer->id);
@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
return code; return code;
} }
static int syncRetrieveWal(SSyncPeer *pPeer) static int syncRetrieveWal(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
// send wal file, // send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok // inotify is not required, old wal file won't be modified, even remove is ok
if ( stat(fname, &fstat) < 0 ) break; if (stat(fname, &fstat) < 0) break;
size = fstat.st_size; size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
return code; return code;
} }
static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRetrieveData(void *param) void *syncRetrieveData(void *param) {
{ SSyncPeer * pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();

View File

@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
static void taosStopPoolThread(SThreadObj* pThread); static void taosStopPoolThread(SThreadObj* pThread);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
{
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) void taosCloseTcpThreadPool(void *param) {
{
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
SThreadObj *pThread; SThreadObj *pThread;
@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param)
uDebug("%p TCP pool is closed", pPool); uDebug("%p TCP pool is closed", pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
{
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) void taosFreeTcpConn(void *param) {
{ SConnObj * pConn = (SConnObj *)param;
SConnObj *pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);

View File

@ -40,13 +40,9 @@ typedef struct {
void *pConn; void *pConn;
} SNodeConn; } SNodeConn;
uint16_t tsArbitratorPort = 0;
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; char arbLogPath[TSDB_FILENAME_LEN + 16] = {0};
tsArbitratorPort = tsServerPort + TSDB_PORT_ARBITRATOR;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
tsArbitratorPort = atoi(argv[++i]); tsArbitratorPort = atoi(argv[++i]);
@ -103,9 +99,7 @@ int main(int argc, char *argv[]) {
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort); sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort);
for (int res = tsem_wait(&tsArbSem); res != 0; res = tsem_wait(&tsArbSem)) { tsem_wait(&tsArbSem);
if (res != EINTR) break;
}
taosCloseTcpThreadPool(tsArbTcpPool); taosCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n"); sInfo("TAOS arbitrator is shut down\n");
@ -156,9 +150,8 @@ static void arbProcessBrokenLink(void *param) {
taosTFree(pNode); taosTFree(pNode);
} }
static int arbProcessPeerMsg(void *param, void *buffer) static int arbProcessPeerMsg(void *param, void *buffer) {
{ SNodeConn * pNode = param;
SNodeConn *pNode = param;
SSyncHead head; SSyncHead head;
int bytes = 0; int bytes = 0;
char *cont = (char *)buffer; char *cont = (char *)buffer;
@ -180,7 +173,6 @@ static int arbProcessPeerMsg(void *param, void *buffer)
} }
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) {
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
@ -192,4 +184,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
// inform main thread to exit // inform main thread to exit
tsem_post(&tsArbSem); tsem_post(&tsArbSem);
} }

View File

@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion)
{ {
uint32_t magic; uint32_t magic;
struct stat fstat; struct stat fstat;

View File

@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int32_t* size); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c

View File

@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
} }
} }
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) { void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
uint32_t version = 0; uint32_t version = 0;
STsdbFileInfo info = {0}; STsdbFileInfo info = {0};
@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) {
close(fd); close(fd);
*magic = info.magic; *magic = info.magic;
*size = (int32_t)offset; *size = offset;
return; return;

View File

@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
return 0; return 0;
} }
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size) { uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;

View File

@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore *pStore); int tdKVStoreEndCommit(SKVStore *pStore);
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size); void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -220,8 +220,14 @@ static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
char pattern[128] = {0}; char pattern[128] = {0};
memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
assert(varDataLen(pRight) < 128); assert(varDataLen(pRight) < 128);
int32_t ret = patternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft), &pInfo); size_t sz = varDataLen(pLeft);
char *buf = malloc(sz + 1);
memcpy(buf, varDataVal(pLeft), sz);
buf[sz] = 0;
int32_t ret = patternMatch(pattern, buf, sz, &pInfo);
free(buf);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
} }

View File

@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
return 0; return 0;
} }
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
SStoreInfo info = {0}; SStoreInfo info = {0};
@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) {
close(fd); close(fd);
*magic = info.magic; *magic = info.magic;
*size = (int32_t)offset; *size = offset;
return; return;

View File

@ -123,11 +123,6 @@ void *taosProcessSchedQueue(void *param) {
while (1) { while (1) {
if (tsem_wait(&pSched->fullSem) != 0) { if (tsem_wait(&pSched->fullSem) != 0) {
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
uDebug("wait %s fullSem was interrupted", pSched->label);
continue;
}
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
} }
if (pSched->stop) { if (pSched->stop) {
@ -163,12 +158,8 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
return 0; return 0;
} }
while (tsem_wait(&pSched->emptySem) != 0) { if (tsem_wait(&pSched->emptySem) != 0) {
if (errno != EINTR) { uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
break;
}
uDebug("wait %s emptySem was interrupted", pSched->label);
} }
if (pthread_mutex_lock(&pSched->queueMutex) != 0) if (pthread_mutex_lock(&pSched->queueMutex) != 0)

View File

@ -1,7 +0,0 @@
char version[12] = "2.0.2.0";
char compatible_version[12] = "2.0.0.0";
char gitinfo[48] = "d711657139620f6c50f362597020705b8ad26bd2";
char gitinfoOfInternal[48] = "1d74ae24c541ffbb280e8630883c0236cd45f8c7";
char buildinfo[64] = "Built by root at 2020-08-24 16:31";
void libtaos_2_0_2_0_Linux_x64_beta() {};

View File

@ -0,0 +1,7 @@
char version[12] = "${TD_VER_1}.${TD_VER_2}.${TD_VER_3}.${TD_VER_4}";
char compatible_version[12] = "${TD_VER_COMPATIBLE}";
char gitinfo[48] = "${TD_VER_GIT}";
char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}";
char buildinfo[64] = "Built at ${TD_VER_DATE}";
void libtaos_${TD_VER_1}_${TD_VER_2}_${TD_VER_3}_${TD_VER_4}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};

View File

@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status); static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds); static void vnodeCtrlFlow(void *handle, int32_t mseconds);
@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
if (pVnode->sync == NULL) { if (pVnode->sync == NULL) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
return 0; return 0;
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
*fversion = pVnode->fversion; *fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
@ -549,7 +551,7 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
static void vnodeNotifyRole(void *ahandle, int8_t role) { static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role); vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
pVnode->role = role; pVnode->role = role;
dnodeSendStatusMsgToMnode(); dnodeSendStatusMsgToMnode();

View File

@ -20,16 +20,17 @@ using System.Runtime.InteropServices;
namespace TDengineDriver namespace TDengineDriver
{ {
enum TDengineDataType { enum TDengineDataType {
TSDB_DATA_TYPE_BOOL = 1, TSDB_DATA_TYPE_NULL = 0, // 1 bytes
TSDB_DATA_TYPE_TINYINT = 2, TSDB_DATA_TYPE_BOOL = 1, // 1 bytes
TSDB_DATA_TYPE_SMALLINT = 3, TSDB_DATA_TYPE_TINYINT = 2, // 1 bytes
TSDB_DATA_TYPE_INT = 4, TSDB_DATA_TYPE_SMALLINT = 3, // 2 bytes
TSDB_DATA_TYPE_BIGINT = 5, TSDB_DATA_TYPE_INT = 4, // 4 bytes
TSDB_DATA_TYPE_FLOAT = 6, TSDB_DATA_TYPE_BIGINT = 5, // 8 bytes
TSDB_DATA_TYPE_DOUBLE = 7, TSDB_DATA_TYPE_FLOAT = 6, // 4 bytes
TSDB_DATA_TYPE_BINARY = 8, TSDB_DATA_TYPE_DOUBLE = 7, // 8 bytes
TSDB_DATA_TYPE_TIMESTAMP = 9, TSDB_DATA_TYPE_BINARY = 8, // string
TSDB_DATA_TYPE_NCHAR = 10 TSDB_DATA_TYPE_TIMESTAMP = 9,// 8 bytes
TSDB_DATA_TYPE_NCHAR = 10 // unicode string
} }
enum TDengineInitOption enum TDengineInitOption
@ -79,54 +80,53 @@ namespace TDengineDriver
class TDengine class TDengine
{ {
public const int TSDB_CODE_SUCCESS = 0; public const int TSDB_CODE_SUCCESS = 0;
[DllImport("taos.dll", EntryPoint = "taos_init", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_init", CallingConvention = CallingConvention.Cdecl)]
static extern public void Init(); static extern public void Init();
[DllImport("taos.dll", EntryPoint = "taos_options", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_cleanup", CallingConvention = CallingConvention.Cdecl)]
static extern public void Cleanup();
[DllImport("taos.dll", EntryPoint = "taos_options", CallingConvention = CallingConvention.Cdecl)]
static extern public void Options(int option, string value); static extern public void Options(int option, string value);
[DllImport("taos.dll", EntryPoint = "taos_connect", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_connect", CallingConvention = CallingConvention.Cdecl)]
static extern public long Connect(string ip, string user, string password, string db, int port); static extern public IntPtr Connect(string ip, string user, string password, string db, short port);
[DllImport("taos.dll", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr taos_errstr(long taos); static extern private IntPtr taos_errstr(IntPtr res);
static public string Error(long conn) static public string Error(IntPtr res)
{ {
IntPtr errPtr = taos_errstr(conn); IntPtr errPtr = taos_errstr(res);
return Marshal.PtrToStringAnsi(errPtr); return Marshal.PtrToStringAnsi(errPtr);
} }
[DllImport("taos.dll", EntryPoint = "taos_errno", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_errno", CallingConvention = CallingConvention.Cdecl)]
static extern public int ErrorNo(long taos); static extern public int ErrorNo(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_query", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
static extern public int Query(long taos, string sqlstr); static extern public IntPtr Query(IntPtr conn, string sqlstr);
[DllImport("taos.dll", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.Cdecl)]
static extern public int AffectRows(long taos); static extern public int AffectRows(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_use_result", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.Cdecl)]
static extern public long UseResult(long taos); static extern public int FieldCount(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.Cdecl)]
static extern public int FieldCount(long taos); static extern private IntPtr taos_fetch_fields(IntPtr res);
static public List<TDengineMeta> FetchFields(IntPtr res)
[DllImport("taos.dll", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.StdCall)]
static extern private IntPtr taos_fetch_fields(long res);
static public List<TDengineMeta> FetchFields(long taos)
{ {
const int fieldSize = 68; const int fieldSize = 68;
List<TDengineMeta> metas = new List<TDengineMeta>(); List<TDengineMeta> metas = new List<TDengineMeta>();
long result = TDengine.UseResult(taos); if (res == IntPtr.Zero)
if (result == 0)
{ {
return metas; return metas;
} }
int fieldCount = FieldCount(taos); int fieldCount = FieldCount(res);
IntPtr fieldsPtr = taos_fetch_fields(result); IntPtr fieldsPtr = taos_fetch_fields(res);
for (int i = 0; i < fieldCount; ++i) for (int i = 0; i < fieldCount; ++i)
{ {
@ -134,21 +134,21 @@ namespace TDengineDriver
TDengineMeta meta = new TDengineMeta(); TDengineMeta meta = new TDengineMeta();
meta.name = Marshal.PtrToStringAnsi(fieldsPtr + offset); meta.name = Marshal.PtrToStringAnsi(fieldsPtr + offset);
meta.size = Marshal.ReadInt16(fieldsPtr + offset + 64); meta.type = Marshal.ReadByte(fieldsPtr + offset + 65);
meta.type = Marshal.ReadByte(fieldsPtr + offset + 66); meta.size = Marshal.ReadInt16(fieldsPtr + offset + 66);
metas.Add(meta); metas.Add(meta);
} }
return metas; return metas;
} }
[DllImport("taos.dll", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FetchRows(long res); static extern public IntPtr FetchRows(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FreeResult(long res); static extern public IntPtr FreeResult(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_close", CallingConvention = CallingConvention.StdCall)] [DllImport("taos.dll", EntryPoint = "taos_close", CallingConvention = CallingConvention.Cdecl)]
static extern public int Close(long taos); static extern public int Close(IntPtr taos);
} }
} }

View File

@ -28,7 +28,7 @@ namespace TDengineDriver
private string configDir; private string configDir;
private string user; private string user;
private string password; private string password;
private int port = 0; private short port = 0;
//sql parameters //sql parameters
private string dbName; private string dbName;
@ -43,7 +43,7 @@ namespace TDengineDriver
private long batchRows; private long batchRows;
private long beginTimestamp = 1551369600000L; private long beginTimestamp = 1551369600000L;
private long conn = 0; private IntPtr conn = IntPtr.Zero;
private long rowsInserted = 0; private long rowsInserted = 0;
static void Main(string[] args) static void Main(string[] args)
@ -191,7 +191,7 @@ namespace TDengineDriver
{ {
string db = ""; string db = "";
this.conn = TDengine.Connect(this.host, this.user, this.password, db, this.port); this.conn = TDengine.Connect(this.host, this.user, this.password, db, this.port);
if (this.conn == 0) if (this.conn == IntPtr.Zero)
{ {
Console.WriteLine("Connect to TDengine failed"); Console.WriteLine("Connect to TDengine failed");
ExitProgram(); ExitProgram();
@ -211,58 +211,62 @@ namespace TDengineDriver
StringBuilder sql = new StringBuilder(); StringBuilder sql = new StringBuilder();
sql.Append("create database if not exists ").Append(this.dbName); sql.Append("create database if not exists ").Append(this.dbName);
int code = TDengine.Query(this.conn, sql.ToString()); IntPtr res = TDengine.Query(this.conn, sql.ToString());
if (code == TDengine.TSDB_CODE_SUCCESS) if (res != IntPtr.Zero)
{ {
Console.WriteLine(sql.ToString() + " success"); Console.WriteLine(sql.ToString() + " success");
} }
else else
{ {
Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(conn)); Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res));
ExitProgram(); ExitProgram();
} }
TDengine.FreeResult(res);
sql.Clear(); sql.Clear();
sql.Append("use ").Append(this.dbName); sql.Append("use ").Append(this.dbName);
code = TDengine.Query(this.conn, sql.ToString()); res = TDengine.Query(this.conn, sql.ToString());
if (code == TDengine.TSDB_CODE_SUCCESS) if (res != IntPtr.Zero)
{ {
Console.WriteLine(sql.ToString() + " success"); Console.WriteLine(sql.ToString() + " success");
} }
else else
{ {
Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res));
ExitProgram(); ExitProgram();
} }
TDengine.FreeResult(res);
sql.Clear(); sql.Clear();
sql.Append("create table if not exists ").Append(this.stableName).Append("(ts timestamp, v1 int) tags(t1 int)"); sql.Append("create table if not exists ").Append(this.stableName).Append("(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)");
code = TDengine.Query(this.conn, sql.ToString()); res = TDengine.Query(this.conn, sql.ToString());
if (code == TDengine.TSDB_CODE_SUCCESS) if (res != IntPtr.Zero)
{ {
Console.WriteLine(sql.ToString() + " success"); Console.WriteLine(sql.ToString() + " success");
} }
else else
{ {
Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res));
ExitProgram(); ExitProgram();
} }
TDengine.FreeResult(res);
for (int i = 0; i < this.tableCount; i++) for (int i = 0; i < this.tableCount; i++)
{ {
sql.Clear(); sql.Clear();
sql = sql.Append("create table if not exists ").Append(this.tablePrefix).Append(i) sql = sql.Append("create table if not exists ").Append(this.tablePrefix).Append(i)
.Append(" using ").Append(this.stableName).Append(" tags(").Append(i).Append(")"); .Append(" using ").Append(this.stableName).Append(" tags(").Append(i).Append(")");
code = TDengine.Query(this.conn, sql.ToString()); res = TDengine.Query(this.conn, sql.ToString());
if (code == TDengine.TSDB_CODE_SUCCESS) if (res != IntPtr.Zero)
{ {
Console.WriteLine(sql.ToString() + " success"); Console.WriteLine(sql.ToString() + " success");
} }
else else
{ {
Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res));
ExitProgram(); ExitProgram();
} }
TDengine.FreeResult(res);
} }
Console.WriteLine("create db and table success"); Console.WriteLine("create db and table success");
@ -287,16 +291,22 @@ namespace TDengineDriver
for (int batch = 0; batch < this.batchRows; ++batch) for (int batch = 0; batch < this.batchRows; ++batch)
{ {
long rows = loop * this.batchRows + batch; long rows = loop * this.batchRows + batch;
sql.Append("(").Append(this.beginTimestamp + rows).Append(",").Append(rows).Append(")"); sql.Append("(")
.Append(this.beginTimestamp + rows)
.Append(", 1, 2, 3,")
.Append(rows)
.Append(", 5, 6, 7, 'abc', 'def')");
} }
int code = TDengine.Query(conn, sql.ToString()); IntPtr res = TDengine.Query(this.conn, sql.ToString());
if (code != TDengine.TSDB_CODE_SUCCESS) if (res == IntPtr.Zero)
{ {
Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(conn)); Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res));
} }
int affectRows = TDengine.AffectRows(conn); int affectRows = TDengine.AffectRows(res);
this.rowsInserted += affectRows; this.rowsInserted += affectRows;
TDengine.FreeResult(res);
} }
} }
@ -316,51 +326,45 @@ namespace TDengineDriver
System.DateTime start = new System.DateTime(); System.DateTime start = new System.DateTime();
long queryRows = 0; long queryRows = 0;
for (int i = 0; i < this.tableCount; ++i) for (int i = 0; i < 1/*this.tableCount*/; ++i)
{ {
String sql = "select * from " + this.dbName + "." + tablePrefix + i; String sql = "select * from " + this.dbName + "." + tablePrefix + i;
Console.WriteLine(sql); Console.WriteLine(sql);
int code = TDengine.Query(conn, sql); IntPtr res = TDengine.Query(conn, sql);
if (code != TDengine.TSDB_CODE_SUCCESS) if (res == IntPtr.Zero)
{ {
Console.WriteLine(sql + " failure, reason: " + TDengine.Error(conn)); Console.WriteLine(sql + " failure, reason: " + TDengine.Error(res));
ExitProgram(); ExitProgram();
} }
int fieldCount = TDengine.FieldCount(conn); int fieldCount = TDengine.FieldCount(res);
//Console.WriteLine("field count: " + fieldCount); Console.WriteLine("field count: " + fieldCount);
List<TDengineMeta> metas = TDengine.FetchFields(conn); List<TDengineMeta> metas = TDengine.FetchFields(res);
for (int j = 0; j < metas.Count; j++) for (int j = 0; j < metas.Count; j++)
{ {
TDengineMeta meta = (TDengineMeta)metas[j]; TDengineMeta meta = (TDengineMeta)metas[j];
//Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size); Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size);
}
long result = TDengine.UseResult(conn);
if (result == 0)
{
Console.WriteLine(sql + " result set is null");
return;
} }
IntPtr rowdata; IntPtr rowdata;
while ((rowdata = TDengine.FetchRows(result)) != IntPtr.Zero) StringBuilder builder = new StringBuilder();
while ((rowdata = TDengine.FetchRows(res)) != IntPtr.Zero)
{ {
queryRows++; queryRows++;
for (int fields = 0; fields < fieldCount; ++fields) for (int fields = 0; fields < fieldCount; ++fields)
{ {
TDengineMeta meta = metas[fields]; TDengineMeta meta = metas[fields];
int offset = 8 * fields; int offset = IntPtr.Size * fields;
IntPtr data = Marshal.ReadIntPtr(rowdata, offset); IntPtr data = Marshal.ReadIntPtr(rowdata, offset);
//Console.Write("---"); builder.Append("---");
if (data == IntPtr.Zero) if (data == IntPtr.Zero)
{ {
//Console.Write("NULL"); builder.Append("NULL");
continue; continue;
} }
@ -368,55 +372,61 @@ namespace TDengineDriver
{ {
case TDengineDataType.TSDB_DATA_TYPE_BOOL: case TDengineDataType.TSDB_DATA_TYPE_BOOL:
bool v1 = Marshal.ReadByte(data) == 0 ? false : true; bool v1 = Marshal.ReadByte(data) == 0 ? false : true;
//Console.Write(v1); builder.Append(v1);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_TINYINT: case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
byte v2 = Marshal.ReadByte(data); byte v2 = Marshal.ReadByte(data);
//Console.Write(v2); builder.Append(v2);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_SMALLINT: case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
short v3 = Marshal.ReadInt16(data); short v3 = Marshal.ReadInt16(data);
//Console.Write(v3); builder.Append(v3);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_INT: case TDengineDataType.TSDB_DATA_TYPE_INT:
int v4 = Marshal.ReadInt32(data); int v4 = Marshal.ReadInt32(data);
//Console.Write(v4); builder.Append(v4);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_BIGINT: case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
long v5 = Marshal.ReadInt64(data); long v5 = Marshal.ReadInt64(data);
//Console.Write(v5); builder.Append(v5);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_FLOAT: case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
float v6 = (float)Marshal.PtrToStructure(data, typeof(float)); float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
//Console.Write(v6); builder.Append(v6);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_DOUBLE: case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
double v7 = (double)Marshal.PtrToStructure(data, typeof(double)); double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
//Console.Write(v7); builder.Append(v7);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_BINARY: case TDengineDataType.TSDB_DATA_TYPE_BINARY:
string v8 = Marshal.PtrToStringAnsi(data); string v8 = Marshal.PtrToStringAnsi(data);
//Console.Write(v8); builder.Append(v8);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP: case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
long v9 = Marshal.ReadInt64(data); long v9 = Marshal.ReadInt64(data);
//Console.Write(v9); builder.Append(v9);
break; break;
case TDengineDataType.TSDB_DATA_TYPE_NCHAR: case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
string v10 = Marshal.PtrToStringAnsi(data); string v10 = Marshal.PtrToStringAnsi(data);
//Console.Write(v10); builder.Append(v10);
break; break;
} }
} }
//Console.WriteLine("---"); builder.Append("---");
if (queryRows <= 10)
{
Console.WriteLine(builder.ToString());
}
builder.Clear();
} }
if (TDengine.ErrorNo(conn) != 0) if (TDengine.ErrorNo(res) != 0)
{ {
Console.Write("Query is not complete Error {0:G}", TDengine.ErrorNo(conn), TDengine.Error(conn)); Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res));
} }
TDengine.FreeResult(result); TDengine.FreeResult(res);
} }
System.DateTime end = new System.DateTime(); System.DateTime end = new System.DateTime();
@ -428,14 +438,15 @@ namespace TDengineDriver
public void CloseConnection() public void CloseConnection()
{ {
if (conn != 0) if (this.conn != IntPtr.Zero)
{ {
TDengine.Close(conn); TDengine.Close(this.conn);
} }
} }
static void ExitProgram() static void ExitProgram()
{ {
TDengine.Cleanup();
System.Environment.Exit(0); System.Environment.Exit(0);
} }
} }

View File

@ -32,10 +32,16 @@ system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode4 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1
# for crash_gen
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101
system sh/cfg.sh -n dnode1 -c cache -v 2
system sh/cfg.sh -n dnode1 -c keep -v 36500
system sh/cfg.sh -n dnode1 -c walLevel -v 2 system sh/cfg.sh -n dnode1 -c walLevel -v 2
# for windows
system sh/cfg.sh -n dnode1 -c firstEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c firstEp -v 152.136.17.116:6030
system sh/cfg.sh -n dnode1 -c secondEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 152.136.17.116:6030
system sh/cfg.sh -n dnode1 -c serverPort -v 6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030

View File

@ -96,20 +96,20 @@ enum {
struct _script_t; struct _script_t;
typedef struct _cmd_t { typedef struct _cmd_t {
short cmdno; int16_t cmdno;
short nlen; int16_t nlen;
char name[MAX_SIM_CMD_NAME_LEN]; char name[MAX_SIM_CMD_NAME_LEN];
bool (*parseCmd)(char *, struct _cmd_t *, int); bool (*parseCmd)(char *, struct _cmd_t *, int);
bool (*executeCmd)(struct _script_t *script, char *option); bool (*executeCmd)(struct _script_t *script, char *option);
struct _cmd_t *next; struct _cmd_t *next;
} SCommand; } SCommand;
typedef struct { typedef struct {
short cmdno; int16_t cmdno;
short jump; // jump position int16_t jump; // jump position
short errorJump; // sql jump flag, while '-x' exist in sql cmd, this flag int16_t errorJump; // sql jump flag, while '-x' exist in sql cmd, this flag
// will be SQL_JUMP_TRUE, otherwise is SQL_JUMP_FALSE */ // will be SQL_JUMP_TRUE, otherwise is SQL_JUMP_FALSE */
short lineNum; // correspodning line number in original file int16_t lineNum; // correspodning line number in original file
int optionOffset; // relative option offset int optionOffset; // relative option offset
} SCmdLine; } SCmdLine;
@ -120,7 +120,7 @@ typedef struct _var_t {
} SVariable; } SVariable;
typedef struct _script_t { typedef struct _script_t {
int type; int type;
bool killed; bool killed;
void *taos; void *taos;
@ -130,10 +130,10 @@ typedef struct _script_t {
char system_exit_code[12]; char system_exit_code[12];
char system_ret_content[MAX_SYSTEM_RESULT_LEN]; char system_ret_content[MAX_SYSTEM_RESULT_LEN];
int varLen; int varLen;
int linePos; // current cmd position int linePos; // current cmd position
int numOfLines; // number of lines in the script int numOfLines; // number of lines in the script
int bgScriptLen; int bgScriptLen;
char fileName[MAX_FILE_NAME_LEN]; // script file name char fileName[MAX_FILE_NAME_LEN]; // script file name
char error[MAX_ERROR_LEN]; char error[MAX_ERROR_LEN];
char *optionBuffer; char *optionBuffer;

View File

@ -33,21 +33,21 @@ enum {
/* label stack */ /* label stack */
typedef struct { typedef struct {
char top; /* number of labels */ char top; /* number of labels */
short pos[MAX_NUM_LABLES]; /* the position of the label */ int16_t pos[MAX_NUM_LABLES]; /* the position of the label */
char label[MAX_NUM_LABLES][MAX_LABEL_LEN]; /* name of the label */ char label[MAX_NUM_LABLES][MAX_LABEL_LEN]; /* name of the label */
} SLabel; } SLabel;
/* block definition */ /* block definition */
typedef struct { typedef struct {
char top; /* the number of blocks stacked */ char top; /* the number of blocks stacked */
char type[MAX_NUM_BLOCK]; /* the block type */ char type[MAX_NUM_BLOCK]; /* the block type */
short *pos[MAX_NUM_BLOCK]; /* position of the jump for if/elif/case */ int16_t *pos[MAX_NUM_BLOCK]; /* position of the jump for if/elif/case */
short back[MAX_NUM_BLOCK]; /* go back, endw and continue */ int16_t back[MAX_NUM_BLOCK]; /* go back, endw and continue */
char numJump[MAX_NUM_BLOCK]; char numJump[MAX_NUM_BLOCK];
short *jump[MAX_NUM_BLOCK][MAX_NUM_JUMP]; /* break or elif */ int16_t *jump[MAX_NUM_BLOCK][MAX_NUM_JUMP]; /* break or elif */
char sexp[MAX_NUM_BLOCK][40]; /*switch expression */ char sexp[MAX_NUM_BLOCK][40]; /*switch expression */
char sexpLen[MAX_NUM_BLOCK]; /*switch expression length */ char sexpLen[MAX_NUM_BLOCK]; /*switch expression length */
} SBlock; } SBlock;
bool simParseExpression(char *token, int lineNum); bool simParseExpression(char *token, int lineNum);