add some os files

This commit is contained in:
Shengliang Guan 2021-09-30 16:41:02 +08:00
parent a0aa00b51d
commit 7af18fc96c
29 changed files with 1596 additions and 218 deletions

6
deps/CMakeLists.txt vendored
View File

@ -8,7 +8,13 @@ target_include_directories(
# see https://stackoverflow.com/questions/25676277/cmake-target-include-directories-prints-an-error-when-i-try-to-add-the-source
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cJson>
)
add_subdirectory(lz4/build/cmake)
target_include_directories(
lz4_static
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/lz4/lib
)
add_subdirectory(zlib)
target_include_directories(
zlib

View File

@ -41,15 +41,6 @@ extern "C" {
// Bytes for each type.
extern const int32_t TYPE_BYTES[15];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY)
#if LINUX

View File

@ -21,22 +21,29 @@ extern "C" {
#endif
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <float.h>
#include <inttypes.h>
#include <sched.h>
#include <locale.h>
#include <math.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include<sys/time.h>
// #include <arpa/inet.h>
// #include <fcntl.h>
// #include <pthread.h>
// // #include <sched.h>
//
// #include <signal.h>
// #include <sys/time.h>
// #include <unistd.h>
#include "osAtomic.h"
#include "osDef.h"
@ -48,7 +55,9 @@ extern "C" {
#include "osSemaphore.h"
#include "osSocket.h"
#include "osString.h"
#include "osSleep.h"
#include "osTime.h"
#include "osThread.h"
#ifdef __cplusplus
}

View File

@ -29,11 +29,6 @@ extern "C" {
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
#define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2))
#ifndef PATH_MAX
#define PATH_MAX 1024
#endif
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// MacOS
@ -53,6 +48,16 @@ extern "C" {
#endif
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#ifdef __cplusplus
}
#endif

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_FILE_H
#define TDENGINE_OS_FILE_H
#ifndef _TD_OS_FILE_H_
#define _TD_OS_FILE_H_
#ifdef __cplusplus
extern "C" {
@ -22,13 +22,6 @@ extern "C" {
#include "osSocket.h"
#define FD_VALID(x) ((x) > STDERR_FILENO)
#define FD_INITIALIZER ((int32_t)-1)
#ifndef PATH_MAX
#define PATH_MAX 256
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
typedef int32_t FileFd;
typedef SOCKET SocketFd;
@ -37,40 +30,44 @@ typedef int32_t FileFd;
typedef int32_t SocketFd;
#endif
int64_t taosRead(FileFd fd, void *buf, int64_t count);
int64_t taosWrite(FileFd fd, void *buf, int64_t count);
#define FD_INITIALIZER ((int32_t)-1)
int64_t taosLSeek(FileFd fd, int64_t offset, int32_t whence);
int32_t taosFtruncate(FileFd fd, int64_t length);
int32_t taosFsync(FileFd fd);
#ifndef PATH_MAX
#define PATH_MAX 256
#endif
int32_t taosRename(char* oldName, char *newName);
int64_t taosCopy(char *from, char *to);
int32_t taosLockFile(FileFd fd);
int32_t taosUnLockFile(FileFd fd);
int32_t taosUmaskFile(FileFd fd);
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime);
int32_t taosFStatFile(FileFd fd, int64_t *size, int32_t *mtime);
FileFd taosOpenFileWrite(const char *path);
FileFd taosOpenFileCreateWrite(const char *path);
FileFd taosOpenFileTruncCreateWrite(const char *path);
FileFd taosOpenFileRead(const char *path);
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(FileFd fd, int64_t length);
int32_t taosFsyncFile(FileFd fd);
int64_t taosReadFile(FileFd fd, void *buf, int64_t count);
int64_t taosWriteFile(FileFd fd, void *buf, int64_t count);
void taosCloseFile(FileFd fd);
int32_t taosRenameFile(char *oldName, char *newName);
int64_t taosCopyFile(char *from, char *to);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath);
void taosClose(FileFd fd);
#ifdef TAOS_RANDOM_FILE_FAIL
void taosSetRandomFileFailFactor(int32_t factor);
void taosSetRandomFileFailOutput(const char *path);
#ifdef TAOS_RANDOM_FILE_FAIL_TEST
int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line);
#undef taosRead
#undef taosWrite
#undef taosLSeek
#define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
#endif
#endif
#ifdef __cplusplus
}
#endif
#endif
#endif /*_TD_OS_FILE_H_*/

31
include/os/osRand.h Normal file
View File

@ -0,0 +1,31 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_RAND_H
#define TDENGINE_OS_RAND_H
#ifdef __cplusplus
extern "C" {
#endif
uint32_t taosRand(void);
void taosRandStr(char* str, int32_t size);
uint32_t taosSafeRand(void);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
#include <semaphore.h>
#if defined (_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);

29
include/os/osSleep.h Normal file
View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SLEEP_H
#define TDENGINE_OS_SLEEP_H
#ifdef __cplusplus
extern "C" {
#endif
void taosMsleep(int32_t ms);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -20,6 +20,19 @@
extern "C" {
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h"
#include <WS2tcpip.h>
#include <winbase.h>
#include <Winsock2.h>
#else
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <unistd.h>
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
#define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen)
@ -35,50 +48,20 @@ extern "C" {
#define taosCloseSocketNoCheck(x) close(x)
#define taosCloseSocket(x) \
{ \
if (FD_VALID(x)) { \
if ((x) > -1) { \
close(x); \
x = FD_INITIALIZER; \
} \
}
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define TAOS_EPOLL_WAIT_TIME 100
typedef SOCKET eventfd_t;
#define eventfd(a, b) -1
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
#elif defined(_TD_DARWIN_64)
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#else
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#endif
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
#undef taosWriteSocket
#define taosSend(sockfd, buf, len, flags) taosSendRandomFail(sockfd, buf, len, flags)
#define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) taosSendToRandomFail(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosReadSocket(fd, buf, len) taosReadSocketRandomFail(fd, buf, len)
#define taosWriteSocket(fd, buf, len) taosWriteSocketRandomFail(fd, buf, len)
#endif
#endif
void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
@ -88,9 +71,7 @@ int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *op
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t* optlen);
uint32_t taosInetAddr(char *ipAddr);
#if 0
const char *taosInetNtoa(struct in_addr ipInt);
#endif
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
#define htobe64 htonll

30
include/os/osThread.h Normal file
View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_OS_THREAD_H_
#define _TD_OS_THREAD_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <pthread.h>
#include <semaphore.h>
#ifdef __cplusplus
}
#endif
#endif /*_TD_OS_THREAD_H_*/

View File

@ -20,20 +20,22 @@
extern "C" {
#endif
int32_t taosGetTimeOfDay(struct timeval *tv);
//@return timestamp in second
int32_t taosGetTimestampSec();
//@return timestamp in millisecond
static FORCE_INLINE int64_t taosGetTimestampMs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
taosGetTimeOfDay(&systemTime);
return (int64_t)systemTime.tv_sec * 1000L + (int64_t)systemTime.tv_usec / 1000;
}
//@return timestamp in microsecond
static FORCE_INLINE int64_t taosGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
taosGetTimeOfDay(&systemTime);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}

43
include/util/tidpool.h Normal file
View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TIDPOOL_H
#define TDENGINE_TIDPOOL_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosInitIdPool(int maxId);
int taosUpdateIdPool(void *handle, int maxId);
int taosIdPoolMaxSize(void *handle);
int taosAllocateId(void *handle);
void taosFreeId(void *handle, int id);
void taosIdPoolCleanUp(void *handle);
int taosIdPoolNumOfUsed(void *handle);
bool taosIdPoolMarkStatus(void *handle, int id);
#ifdef __cplusplus
}
#endif
#endif

36
include/util/tmempool.h Normal file
View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TMEMPOOL_H
#define TDENGINE_TMEMPOOL_H
#ifdef __cplusplus
extern "C" {
#endif
#define mpool_h void *
mpool_h taosMemPoolInit(int maxNum, int blockSize);
char *taosMemPoolMalloc(mpool_h handle);
void taosMemPoolFree(mpool_h handle, char *p);
void taosMemPoolCleanUp(mpool_h handle);
#ifdef __cplusplus
}
#endif
#endif

77
include/util/tref.h Normal file
View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TREF_H
#define TDENGINE_TREF_H
#ifdef __cplusplus
extern "C" {
#endif
// open a reference set, max is the mod used by hash, fp is the pointer to free resource function
// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int taosOpenRef(int max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosCloseRef(int refId);
// add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t taosAddRef(int refId, void *p);
// remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosRemoveRef(int rsetId, int64_t rid);
// acquire ref, rid is the reference ID returned by taosAddRef
// return the resource p. On error, NULL is returned, and terrno is set appropriately
void *taosAcquireRef(int rsetId, int64_t rid);
// release ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosReleaseRef(int rsetId, int64_t rid);
// return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
void *taosIterateRef(int rsetId, int64_t rid);
// return the number of references in system
int taosListRef();
#define RID_VALID(x) ((x) > 0)
/* sample code to iterate the refs
void demoIterateRefs(int rsetId) {
void *p = taosIterateRef(refId, 0);
while (p) {
// process P
// get the rid from p
p = taosIterateRef(rsetId, rid);
}
}
*/
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H

View File

@ -23,7 +23,6 @@ extern "C" {
#include "os.h"
#include "tmd5.h"
#include "tcrc32c.h"
#include "taosdef.h"
int32_t strdequote(char *src);
int32_t strRmquote(char *z, int32_t len);
@ -46,14 +45,19 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, in
char * taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target, int32_t keylen) {
MD5_CTX context;
MD5Init(&context);
MD5Update(&context, inBuf, (unsigned int)inLen);
MD5Final(&context);
memcpy(target, context.digest, TSDB_KEY_LEN);
memcpy(target, context.digest, keylen);
}
#ifdef tListLen
#undefine tListLen
#endif
#define tListLen(x) (sizeof(x) / sizeof((x)[0]))
#ifdef __cplusplus
}
#endif

View File

@ -4,4 +4,12 @@ target_include_directories(
transport
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
transport
PUBLIC lz4_static
PUBLIC os
PUBLIC util
PUBLIC common
)

View File

@ -22,6 +22,9 @@
#include "rpcHead.h"
#include "rpcTcp.h"
#include <sys/epoll.h>
typedef struct SFdObj {
void *signature;
SOCKET fd; // TCP socket FD
@ -195,16 +198,7 @@ void taosStopTcpServer(void *handle) {
pServerObj->stop = 1;
if (pServerObj->fd >= 0) {
#ifdef WINDOWS
closesocket(pServerObj->fd);
#elif defined(__APPLE__)
if (pServerObj->fd!=-1) {
close(pServerObj->fd);
pServerObj->fd = -1;
}
#else
shutdown(pServerObj->fd, SHUT_RD);
#endif
taosShutDownSocketRD(pServerObj->fd);
}
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
@ -267,8 +261,8 @@ static void *taosAcceptTcpConnection(void *arg) {
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) {
taosCloseSocket(connFd);
tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
tError("%s failed to set recv timeout fd(%s)for connection from:%hu", pServerObj->label, strerror(errno),
htons(caddr.sin_port));
continue;
}
@ -280,12 +274,12 @@ static void *taosAcceptTcpConnection(void *arg) {
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
tDebug("%s new TCP connection from %hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else {
taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
tError("%s failed to malloc FdObj(%s) for connection from:%hu", pServerObj->label, strerror(errno),
htons(caddr.sin_port));
}
// pick up next thread for next connection
@ -436,7 +430,7 @@ void taosCloseTcpConnection(void *chandle) {
// pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
shutdown(pFdObj->fd, SHUT_WR);
taosShutDownSocketWR(pFdObj->fd);
}
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
@ -456,7 +450,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context
if (pFdObj->closedByApp == 0) {
shutdown(pFdObj->fd, SHUT_WR);
taosShutDownSocketWR(pFdObj->fd);
SRecvInfo recvInfo;
recvInfo.msg = NULL;

View File

@ -13,24 +13,29 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tulog.h"
void taosClose(FileFd fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
#include <fcntl.h>
#include <sys/file.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
#endif
void taosCloseFile(FileFd fd) {
close(fd);
fd = FD_INITIALIZER;
}
void taosGetTmpfilePath(const char * inputTmpDir, const char *fileNamePrefix, char *dstPath) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char *tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
int32_t len = (int32_t)strlen(tsTempDir);
memcpy(tmpPath, tsTempDir, len);
int32_t len = (int32_t)strlen(inputTmpDir);
memcpy(tmpPath, inputTmpDir, len);
if (tmpPath[len - 1] != '/' && tmpPath[len - 1] != '\\') {
tmpPath[len++] = '\\';
@ -46,16 +51,14 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
char rand[8] = {0};
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
#else
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char *tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX];
int32_t len = strlen(tsTempDir);
memcpy(tmpPath, tsTempDir, len);
char tmpPath[PATH_MAX];
int32_t len = strlen(inputTmpDir);
memcpy(tmpPath, inputTmpDir, len);
static uint64_t seqId = 0;
if (tmpPath[len - 1] != '/') {
@ -73,11 +76,11 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
sprintf(rand, "%" PRIu64, atomic_add_fetch_64(&seqId, 1));
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
#endif
}
int64_t taosRead(FileFd fd, void *buf, int64_t count) {
int64_t taosReadFile(FileFd fd, void *buf, int64_t count) {
int64_t leftbytes = count;
int64_t readbytes;
char * tbuf = (char *)buf;
@ -101,7 +104,7 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count) {
return count;
}
int64_t taosWrite(FileFd fd, void *buf, int64_t n) {
int64_t taosWriteFile(FileFd fd, void *buf, int64_t n) {
int64_t nleft = n;
int64_t nwritten = 0;
char * tbuf = (char *)buf;
@ -121,42 +124,46 @@ int64_t taosWrite(FileFd fd, void *buf, int64_t n) {
return n;
}
int64_t taosLSeek(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); }
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); }
int64_t taosCopy(char *from, char *to) {
int64_t taosCopyFile(char *from, char *to) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
char buffer[4096];
int fidto = -1, fidfrom = -1;
int64_t size = 0;
int64_t bytes;
fidfrom = open(from, O_RDONLY | O_BINARY);
fidfrom = open(from, O_RDONLY);
if (fidfrom < 0) goto _err;
fidto = open(to, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755);
fidto = open(to, O_WRONLY | O_CREAT | O_EXCL, 0755);
if (fidto < 0) goto _err;
while (true) {
bytes = taosRead(fidfrom, buffer, sizeof(buffer));
bytes = taosReadFile(fidfrom, buffer, sizeof(buffer));
if (bytes < 0) goto _err;
if (bytes == 0) break;
size += bytes;
if (taosWrite(fidto, (void *)buffer, bytes) < bytes) goto _err;
if (taosWriteFile(fidto, (void *)buffer, bytes) < bytes) goto _err;
if (bytes < sizeof(buffer)) break;
}
taosFsync(fidto);
taosFsyncFile(fidto);
taosClose(fidfrom);
taosClose(fidto);
taosCloseFile(fidfrom);
taosCloseFile(fidto);
return size;
_err:
if (fidfrom >= 0) taosClose(fidfrom);
if (fidto >= 0) taosClose(fidto);
if (fidfrom >= 0) taosCloseFile(fidfrom);
if (fidto >= 0) taosCloseFile(fidto);
remove(to);
return -1;
#endif
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
@ -306,9 +313,8 @@ int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size
#endif
int32_t taosFtruncateFile(FileFd fd, int64_t l_size) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t taosFtruncate(int32_t fd, int64_t l_size) {
if (fd < 0) {
errno = EBADF;
uError("%s\n", "fd arg was negative");
@ -357,9 +363,13 @@ int32_t taosFtruncate(int32_t fd, int64_t l_size) {
}
return 0;
#else
return ftruncate(fd, l_size);
#endif
}
int32_t taosFsync(FileFd fd) {
int32_t taosFsyncFile(FileFd fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
if (fd < 0) {
errno = EBADF;
uError("%s\n", "fd arg was negative");
@ -369,33 +379,126 @@ int32_t taosFsync(FileFd fd) {
HANDLE h = (HANDLE)_get_osfhandle(fd);
return FlushFileBuffers(h);
#else
return fsync(fd);
#endif
}
int32_t taosRename(char *oldName, char *newName) {
int32_t taosRenameFile(char *oldName, char *newName) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
if (code < 0) {
uError("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
} else {
uTrace("successfully to rename file %s to %s", oldName, newName);
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
}
return code;
}
#else
int32_t taosFtruncate(FileFd fd, int64_t length) { return ftruncate(fd, length); }
int32_t taosFsync(FileFd fd) { return fsync(fd); }
int32_t taosRename(char *oldName, char *newName) {
int32_t code = rename(oldName, newName);
if (code < 0) {
uError("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
} else {
uTrace("successfully to rename file %s to %s", oldName, newName);
printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno));
}
return code;
#endif
}
int32_t taosLockFile(int32_t fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return (int32_t)flock(fd, LOCK_EX | LOCK_NB);
#endif
}
int32_t taosUnLockFile(int32_t fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return (int32_t)flock(fd, LOCK_UN | LOCK_NB);
#endif
}
int32_t taosUmaskFile(int32_t val) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return umask(val);
#endif
}
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
struct stat fileStat;
int32_t code = stat(path, &fileStat);
if (code < 0) {
return code;
}
if (size != NULL) {
*size = fileStat.st_size;
}
if (mtime != NULL) {
*mtime = fileStat.st_mtime;
}
return 0;
#endif
}
int32_t taosFStatFile(int32_t fd, int64_t *size, int32_t *mtime) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
struct stat fileStat;
int32_t code = fstat(fd, &fileStat);
if (code < 0) {
return code;
}
if (size != NULL) {
*size = fileStat.st_size;
}
if (mtime != NULL) {
*mtime = fileStat.st_mtime;
}
return 0;
#endif
}
int32_t taosOpenFileWrite(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
FileFd taosOpenFileRead(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_RDONLY);
#endif
}
int32_t taosOpenFileCreateWrite(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
int32_t taosOpenFileTruncCreateWrite(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}

View File

@ -15,7 +15,26 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tulog.h"
void taosShutDownSocketRD(SOCKET fd) {
#ifdef WINDOWS
closesocket(fd);
#elif __APPLE__
close(fd);
#else
shutdown(fd, SHUT_RD);
#endif
}
void taosShutDownSocketWR(SOCKET fd) {
#ifdef WINDOWS
closesocket(fd);
#elif __APPLE__
close(fd);
#else
shutdown(fd, SHUT_WR);
#endif
}
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
@ -87,4 +106,19 @@ const char *taosInetNtoa(struct in_addr ipInt) {
return inet_ntoa(ipInt);
}
#else
const char *taosInetNtoa(struct in_addr ipInt) {
// not thread safe, only for debug usage while print log
static char tmpDstStr[16];
return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
}
#endif
#if defined(_TD_GO_DLL_)
uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); }
#endif

37
source/os/src/osTime.c Normal file
View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <winsock2.h>
#else
#endif
FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
time_t t;
t = time(NULL);
SYSTEMTIME st;
GetLocalTime(&st);
tv->tv_sec = (long)t;
tv->tv_usec = st.wMilliseconds * 1000;
return 0;
#else
return gettimeofday(tv, NULL);
#endif
}

View File

@ -94,7 +94,7 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) {
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsync(fileno(fp));
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;

View File

@ -173,7 +173,7 @@ static int32_t dnodeWriteEps(DnEps *eps) {
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsync(fileno(fp));
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;

View File

@ -82,7 +82,7 @@ static int32_t dnodeWriteMnodeEps(DnMnEps *meps) {
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsync(fileno(fp));
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;

View File

@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
// #include "osTime.h"
#include "tbuffer.h"
#include "tglobal.h"
#include "tsocket.h"
@ -255,16 +254,16 @@ static void* dnodeTelemThreadFp(void* param) {
}
static void dnodeGetEmail(DnTelem* telem, char* filepath) {
int32_t fd = open(filepath, O_RDONLY);
int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) {
return;
}
if (taosRead(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) {
if (taosReadFile(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
}
taosClose(fd);
taosCloseFile(fd);
}
int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) {

View File

@ -242,7 +242,7 @@ static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *p
static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, "nettestinternal") == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
taosEncryptPass((uint8_t *)user, strlen(user), pass, TSDB_KEY_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;

View File

@ -3,7 +3,7 @@ add_library(util ${UTIL_SRC})
target_include_directories(
util
PUBLIC "${CMAKE_SOURCE_DIR}/include/util"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
util

42
source/util/inc/tulog.h Normal file
View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_COMMON_ULOG_H
#define TDENGINE_COMMON_ULOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
extern int32_t uDebugFlag;
extern int8_t tscEmbedded;
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
#ifdef __cplusplus
}
#endif
#endif

916
source/util/src/hash.c Normal file
View File

@ -0,0 +1,916 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "hash.h"
#include "tulog.h"
// #include "taosdef.h"
#define EXT_SIZE 1024
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define DO_FREE_HASH_NODE(_n) \
do { \
tfree(_n); \
} while (0)
#define FREE_HASH_NODE(_h, _n) \
do { \
if ((_h)->freeFp) { \
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
} \
\
DO_FREE_HASH_NODE(_n); \
} while (0);
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosWLockLatch(lock);
}
static FORCE_INLINE void __rd_lock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosRLockLatch(lock);
}
static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosRUnLockLatch(lock);
}
static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosWUnLockLatch(lock);
}
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
int32_t len = MIN(length, HASH_MAX_CAPACITY);
int32_t i = 4;
while (i < len) i = (i << 1u);
return i;
}
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
SHashNode *pNode = pe->next;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
break;
}
pNode = pNode->next;
}
return pNode;
}
/**
* Resize the hash list if the threshold is reached
*
* @param pHashObj
*/
static void taosHashTableResize(SHashObj *pHashObj);
/**
* @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key
* @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData.
* Pointer copy causes memory access error.
* @param dsize size of data
* @return SHashNode
*/
static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal);
/**
* Update the hash node
*
* @param pNode hash node
* @param key key for generate hash value
* @param keyLen key length
* @param pData actual data
* @param dsize size of actual data
* @return hash node
*/
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen);
pNode->count--;
if (prev != NULL) {
prev->next = pNewNode;
} else {
pe->next = pNewNode;
}
if (pNode->count <= 0) {
pNewNode->next = pNode->next;
DO_FREE_HASH_NODE(pNode);
} else {
pNewNode->next = pNode;
pe->num++;
atomic_add_fetch_64(&pHashObj->size, 1);
}
return pNewNode;
}
/**
* insert the hash node at the front of the linked list
*
* @param pHashObj
* @param pNode
*/
static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
/**
* Check whether the hash table is empty or not.
*
* @param pHashObj the hash table object
* @return if the hash table is empty or not
*/
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
/**
* Get the next element in hash table for iterator
* @param pIter
* @return
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
assert(fn != NULL);
if (capacity == 0) {
capacity = 4;
}
SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj));
if (pHashObj == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
// the max slots is not defined by user
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn;
pHashObj->type = type;
pHashObj->enableUpdate = update;
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *));
if (pHashObj->hashList == NULL) {
free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
} else {
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *));
void *p = calloc(pHashObj->capacity, sizeof(SHashEntry));
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry));
}
taosArrayPush(pHashObj->pMemBlock, &p);
}
return pHashObj;
}
void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp) {
if (pHashObj != NULL && fp != NULL) {
pHashObj->equalFp = fp;
}
}
int32_t taosHashGetSize(const SHashObj *pHashObj) {
if (!pHashObj) {
return 0;
}
return (int32_t)atomic_load_64(&pHashObj->size);
}
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
return taosHashGetSize(pHashObj) == 0;
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
return -1;
}
// need the resize process, write lock applied
if (HASH_NEED_RESIZE(pHashObj)) {
__wr_lock(&pHashObj->lock, pHashObj->type);
taosHashTableResize(pHashObj);
__wr_unlock(&pHashObj->lock, pHashObj->type);
}
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
SHashNode *pNode = pe->next;
if (pe->num > 0) {
assert(pNode != NULL);
} else {
assert(pNode == NULL);
}
SHashNode* prev = NULL;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
break;
}
prev = pNode;
pNode = pNode->next;
}
if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table
pushfrontNodeInEntryList(pe, pNewNode);
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
atomic_add_fetch_64(&pHashObj->size, 1);
return 0;
} else {
// not support the update operation, return error
if (pHashObj->enableUpdate) {
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
DO_FREE_HASH_NODE(pNewNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
return pHashObj->enableUpdate ? 0 : -1;
}
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL);
}
//TODO(yihaoDeng), merge with taosHashGetClone
void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
return NULL;
}
char *data = NULL;
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
if (pe->num > 0) {
assert(pe->next != NULL);
} else {
assert(pe->next == NULL);
}
SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal);
if (pNode != NULL) {
if (fp != NULL) {
fp(GET_HASH_NODE_DATA(pNode));
}
if (*d == NULL) {
*sz = pNode->dataLen + EXT_SIZE;
*d = calloc(1, *sz);
} else if (*sz < pNode->dataLen){
*sz = pNode->dataLen + EXT_SIZE;
*d = realloc(*d, *sz);
}
memcpy((char *)(*d), GET_HASH_NODE_DATA(pNode), pNode->dataLen);
// just make runtime happy
if ((*sz) - pNode->dataLen > 0) {
memset((char *)(*d) + pNode->dataLen, 0, (*sz) - pNode->dataLen);
}
data = GET_HASH_NODE_DATA(pNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
}
void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) {
if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
return NULL;
}
char *data = NULL;
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
if (pe->num > 0) {
assert(pe->next != NULL);
} else {
assert(pe->next == NULL);
}
SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal);
if (pNode != NULL) {
if (fp != NULL) {
fp(GET_HASH_NODE_DATA(pNode));
}
if (d != NULL) {
memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen);
}
data = GET_HASH_NODE_DATA(pNode);
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
}
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
}
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
// double check after locked
if (pe->num == 0) {
assert(pe->next == NULL);
taosWUnLockLatch(&pe->latch);
__rd_unlock(&pHashObj->lock, pHashObj->type);
return -1;
}
int code = -1;
SHashNode *pNode = pe->next;
SHashNode *prevNode = NULL;
while (pNode) {
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0)
break;
prevNode = pNode;
pNode = pNode->next;
}
if (pNode) {
code = 0; // it is found
pNode->count--;
pNode->removed = 1;
if (pNode->count <= 0) {
if (prevNode) {
prevNode->next = pNode->next;
} else {
pe->next = pNode->next;
}
if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize);
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pNode);
}
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return code;
}
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return 0;
}
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t numOfEntries = (int32_t)pHashObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
continue;
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pEntry->latch);
}
// todo remove the first node
SHashNode *pNode = NULL;
while((pNode = pEntry->next) != NULL) {
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) {
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
pEntry->next = pNode->next;
if (pEntry->num == 0) {
assert(pEntry->next == NULL);
} else {
assert(pEntry->next != NULL);
}
FREE_HASH_NODE(pHashObj, pNode);
} else {
break;
}
}
// handle the following node
if (pNode != NULL) {
assert(pNode == pEntry->next);
SHashNode *pNext = NULL;
while ((pNext = pNode->next) != NULL) {
// not qualified, remove it
if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) {
pNode->next = pNext->next;
pEntry->num -= 1;
atomic_sub_fetch_64(&pHashObj->size, 1);
if (pEntry->num == 0) {
assert(pEntry->next == NULL);
} else {
assert(pEntry->next != NULL);
}
FREE_HASH_NODE(pHashObj, pNext);
} else {
pNode = pNext;
}
}
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pEntry->latch);
}
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return 0;
}
void taosHashClear(SHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
SHashNode *pNode, *pNext;
__wr_lock(&pHashObj->lock, pHashObj->type);
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
assert(pEntry->next == 0);
continue;
}
pNode = pEntry->next;
assert(pNode != NULL);
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pHashObj, pNode);
pNode = pNext;
}
pEntry->num = 0;
pEntry->next = NULL;
}
pHashObj->size = 0;
__wr_unlock(&pHashObj->lock, pHashObj->type);
}
void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL) {
return;
}
taosHashClear(pHashObj);
tfree(pHashObj->hashList);
// destroy mem block
size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock);
for (int32_t i = 0; i < memBlock; ++i) {
void *p = taosArrayGetP(pHashObj->pMemBlock, i);
tfree(p);
}
taosArrayDestroy(pHashObj->pMemBlock);
memset(pHashObj, 0, sizeof(SHashObj));
free(pHashObj);
}
// for profile only
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
return 0;
}
int32_t num = 0;
for (int32_t i = 0; i < pHashObj->size; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (num < pEntry->num) {
num = pEntry->num;
}
}
return num;
}
void taosHashTableResize(SHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) {
return;
}
// double the original capacity
SHashNode *pNode = NULL;
SHashNode *pNext = NULL;
int32_t newSize = (int32_t)(pHashObj->capacity << 1u);
if (newSize > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize);
if (pNewEntryList == NULL) { // todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return;
}
pHashObj->hashList = pNewEntryList;
size_t inc = newSize - pHashObj->capacity;
void * p = calloc(inc, sizeof(SHashEntry));
for (int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry));
}
taosArrayPush(pHashObj->pMemBlock, &p);
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pe = pHashObj->hashList[i];
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
if (pe->num == 0) {
assert(pe->next == NULL);
continue;
}
while ((pNode = pe->next) != NULL) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j != i) {
pe->num -= 1;
pe->next = pNode->next;
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
SHashEntry *pNewEntry = pHashObj->hashList[j];
pushfrontNodeInEntryList(pNewEntry, pNode);
} else {
break;
}
}
if (pNode != NULL) {
while ((pNext = pNode->next) != NULL) {
int32_t j = HASH_INDEX(pNext->hashVal, pHashObj->capacity);
if (j != i) {
pe->num -= 1;
pNode->next = pNext->next;
pNext->next = NULL;
// added into new slot
SHashEntry *pNewEntry = pHashObj->hashList[j];
if (pNewEntry->num == 0) {
assert(pNewEntry->next == NULL);
} else {
assert(pNewEntry->next != NULL);
}
pushfrontNodeInEntryList(pNewEntry, pNext);
} else {
pNode = pNext;
}
}
if (pe->num == 0) {
assert(pe->next == NULL);
} else {
assert(pe->next != NULL);
}
}
}
int64_t et = taosGetTimestampUs();
uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize);
if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
pNewNode->keyLen = (uint32_t)keyLen;
pNewNode->hashVal = hashVal;
pNewNode->dataLen = (uint32_t) dsize;
pNewNode->count = 1;
pNewNode->removed = 0;
pNewNode->next = NULL;
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
return pNewNode;
}
void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) {
assert(pNode != NULL && pEntry != NULL);
pNode->next = pEntry->next;
pEntry->next = pNode;
pEntry->num += 1;
}
size_t taosHashGetMemSize(const SHashObj *pHashObj) {
if (pHashObj == NULL) {
return 0;
}
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
}
FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) {
SHashNode * node = GET_HASH_PNODE(data);
return GET_HASH_NODE_KEY(node);
}
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {
SHashNode * node = GET_HASH_PNODE(data);
return node->keyLen;
}
// release the pNode, return next pNode, and lock the current entry
static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
SHashNode *pOld = (SHashNode *)GET_HASH_PNODE(p);
SHashNode *prevNode = NULL;
*slot = HASH_INDEX(pOld->hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[*slot];
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
SHashNode *pNode = pe->next;
while (pNode) {
if (pNode == pOld)
break;
prevNode = pNode;
pNode = pNode->next;
}
if (pNode) {
pNode = pNode->next;
while (pNode) {
if (pNode->removed == 0) break;
pNode = pNode->next;
}
pOld->count--;
if (pOld->count <=0) {
if (prevNode) {
prevNode->next = pOld->next;
} else {
pe->next = pOld->next;
}
pe->num--;
atomic_sub_fetch_64(&pHashObj->size, 1);
FREE_HASH_NODE(pHashObj, pOld);
}
} else {
uError("pNode:%p data:%p is not there!!!", pNode, p);
}
return pNode;
}
void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pHashObj == NULL) return NULL;
int slot = 0;
char *data = NULL;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
SHashNode *pNode = NULL;
if (p) {
pNode = taosHashReleaseNode(pHashObj, p, &slot);
if (pNode == NULL) {
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
slot = slot + 1;
}
}
if (pNode == NULL) {
for (; slot < pHashObj->capacity; ++slot) {
SHashEntry *pe = pHashObj->hashList[slot];
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
pNode = pe->next;
while (pNode) {
if (pNode->removed == 0) break;
pNode = pNode->next;
}
if (pNode) break;
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
}
}
if (pNode) {
SHashEntry *pe = pHashObj->hashList[slot];
pNode->count++;
data = GET_HASH_NODE_DATA(pNode);
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
}
void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
if (pHashObj == NULL || p == NULL) return;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int slot;
taosHashReleaseNode(pHashObj, p, &slot);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
}

View File

@ -19,7 +19,7 @@
#include "tlog.h"
#include "tnote.h"
#include "tutil.h"
#define MAX_LOGLINE_SIZE (1000)
#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10)
#define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100)
@ -85,6 +85,8 @@ int64_t dbgWSize = 0;
char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/power";
#elif (_TD_TQ_ == true)
char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/tq";
#elif (_TD_PRO_ == true)
char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/ProDB";
#else
char tsLogDir[PATH_MAX] = "/var/log/taos";
#endif
@ -134,11 +136,11 @@ void taosCloseLog() {
// taosCloseLog();
}
static bool taosLockFile(int32_t fd) {
static bool taosLockLogFile(int32_t fd) {
if (fd < 0) return false;
if (tsLogObj.fileNum > 1) {
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
int32_t ret = taosUnLockFile(fd);
if (ret == 0) {
return true;
}
@ -147,11 +149,11 @@ static bool taosLockFile(int32_t fd) {
return false;
}
static void taosUnLockFile(int32_t fd) {
static void taosUnLockLogFile(int32_t fd) {
if (fd < 0) return;
if (tsLogObj.fileNum > 1) {
flock(fd, LOCK_UN | LOCK_NB);
taosUnLockFile(fd);
}
}
@ -183,9 +185,9 @@ static void *taosThreadToOpenNewFile(void *param) {
char name[LOG_FILE_NAME_LEN + 20];
sprintf(name, "%s.%d", tsLogObj.logName, tsLogObj.flag);
umask(0);
taosUmaskFile(0);
int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t fd = taosOpenFileTruncCreateWrite(name);
if (fd < 0) {
tsLogObj.openInProgress = 0;
tsLogObj.lines = tsLogObj.maxLines - 1000;
@ -193,8 +195,8 @@ static void *taosThreadToOpenNewFile(void *param) {
return NULL;
}
taosLockFile(fd);
(void)lseek(fd, 0, SEEK_SET);
taosLockLogFile(fd);
(void)taosLSeekFile(fd, 0, SEEK_SET);
int32_t oldFd = tsLogObj.logHandle->fd;
tsLogObj.logHandle->fd = fd;
@ -246,7 +248,7 @@ void taosResetLog() {
}
static bool taosCheckFileIsOpen(char *logFileName) {
int32_t fd = open(logFileName, O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t fd = taosOpenFileWrite(logFileName);
if (fd < 0) {
if (errno == ENOENT) {
return false;
@ -256,12 +258,12 @@ static bool taosCheckFileIsOpen(char *logFileName) {
}
}
if (taosLockFile(fd)) {
taosUnLockFile(fd);
taosClose(fd);
if (taosLockLogFile(fd)) {
taosUnLockLogFile(fd);
taosCloseFile(fd);
return false;
} else {
taosClose(fd);
taosCloseFile(fd);
return true;
}
}
@ -298,9 +300,9 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
maxFileNum = 1;
#endif
char name[LOG_FILE_NAME_LEN + 50] = "\0";
struct stat logstat0, logstat1;
int32_t size;
char name[LOG_FILE_NAME_LEN + 50] = "\0";
int32_t logstat0_mtime, logstat1_mtime;
int32_t size;
tsLogObj.maxLines = maxLines;
tsLogObj.fileNum = maxFileNum;
@ -310,14 +312,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
strcpy(name, fn);
strcat(name, ".0");
}
bool log0Exist = stat(name, &logstat0) >= 0;
bool log0Exist = taosStatFile(name, NULL, &logstat0_mtime) >= 0;
if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn);
strcat(name, ".1");
}
bool log1Exist = stat(name, &logstat1) >= 0;
bool log1Exist = taosStatFile(name, NULL, &logstat1_mtime) >= 0;
// if none of the log files exist, open 0, if both exists, open the old one
if (!log0Exist && !log1Exist) {
tsLogObj.flag = 0;
@ -326,39 +328,39 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
} else if (!log0Exist) {
tsLogObj.flag = 1;
} else {
tsLogObj.flag = (logstat0.st_mtime > logstat1.st_mtime) ? 0 : 1;
tsLogObj.flag = (logstat0_mtime > logstat1_mtime) ? 0 : 1;
}
char fileName[LOG_FILE_NAME_LEN + 50] = "\0";
sprintf(fileName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
pthread_mutex_init(&tsLogObj.logMutex, NULL);
umask(0);
tsLogObj.logHandle->fd = open(fileName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
taosUmaskFile(0);
tsLogObj.logHandle->fd = taosOpenFileCreateWrite(fileName);
if (tsLogObj.logHandle->fd < 0) {
printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno));
return -1;
}
taosLockFile(tsLogObj.logHandle->fd);
taosLockLogFile(tsLogObj.logHandle->fd);
// only an estimate for number of lines
struct stat filestat;
if (fstat(tsLogObj.logHandle->fd, &filestat) < 0) {
int64_t filesize = 0;
if (taosFStatFile(tsLogObj.logHandle->fd, &filesize, NULL) < 0) {
printf("\nfailed to fstat log file:%s, reason:%s\n", fileName, strerror(errno));
return -1;
}
size = (int32_t)filestat.st_size;
size = (int32_t)filesize;
tsLogObj.lines = size / 60;
lseek(tsLogObj.logHandle->fd, 0, SEEK_END);
taosLSeekFile(tsLogObj.logHandle->fd, 0, SEEK_END);
sprintf(name, "==================================================\n");
taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, " new log file \n");
taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, "==================================================\n");
taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
return 0;
}
@ -377,7 +379,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
struct timeval timeSecs;
time_t curTime;
gettimeofday(&timeSecs, NULL);
taosGetTimeOfDay(&timeSecs);
curTime = timeSecs.tv_sec;
ptm = localtime_r(&curTime, &Tm);
@ -408,7 +410,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
taosWrite(tsLogObj.logHandle->fd, buffer, len);
taosWriteFile(tsLogObj.logHandle->fd, buffer, len);
}
if (tsLogObj.maxLines > 0) {
@ -419,7 +421,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
}
if (dflag & DEBUG_SCREEN)
taosWrite(1, buffer, (uint32_t)len);
taosWriteFile(1, buffer, (uint32_t)len);
if (dflag == 255) nInfo(buffer, len);
}
@ -439,7 +441,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
pos += 3;
if (c >= 16) {
temp[pos++] = '\n';
taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
taosWriteFile(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
c = 0;
pos = 0;
}
@ -447,7 +449,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
temp[pos++] = '\n';
taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
taosWriteFile(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
}
void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) {
@ -464,7 +466,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
struct timeval timeSecs;
time_t curTime;
gettimeofday(&timeSecs, NULL);
taosGetTimeOfDay(&timeSecs);
curTime = timeSecs.tv_sec;
ptm = localtime_r(&curTime, &Tm);
@ -485,7 +487,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
taosWrite(tsLogObj.logHandle->fd, buffer, len);
taosWriteFile(tsLogObj.logHandle->fd, buffer, len);
}
if (tsLogObj.maxLines > 0) {
@ -495,7 +497,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
}
}
if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
if (dflag & DEBUG_SCREEN) taosWriteFile(1, buffer, (uint32_t)len);
}
#if 0
@ -506,8 +508,8 @@ void taosCloseLog() {
static void taosCloseLogByFd(int32_t fd) {
if (fd >= 0) {
taosUnLockFile(fd);
taosClose(fd);
taosUnLockLogFile(fd);
taosCloseFile(fd);
}
}
@ -644,12 +646,12 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
}
if (start < end) {
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
} else {
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
}
dbgWN++;