Merge branch '3.0' of https://github.com/taosdata/TDengine into TD-20251
This commit is contained in:
commit
0e1f0106b7
|
@ -60,7 +60,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
|
|||
为了在 Ubuntu/Debian 系统上编译 [taos-tools](https://github.com/taosdata/taos-tools) 需要安装如下软件:
|
||||
|
||||
```bash
|
||||
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config
|
||||
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config
|
||||
```
|
||||
|
||||
### CentOS 7.9
|
||||
|
@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
|
|||
|
||||
|
||||
```
|
||||
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
|
||||
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
|
||||
```
|
||||
|
||||
#### CentOS 8/Rocky Linux
|
||||
|
@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco
|
|||
sudo yum install -y epel-release
|
||||
sudo yum install -y dnf-plugins-core
|
||||
sudo yum config-manager --set-enabled powertools
|
||||
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
|
||||
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
|
||||
```
|
||||
|
||||
注意:由于 snappy 缺乏 pkg-config 支持(参考 [链接](https://github.com/google/snappy/pull/86)),会导致 cmake 提示无法发现 libsnappy,实际上工作正常。
|
||||
|
|
|
@ -62,7 +62,7 @@ sudo apt-get install -y gcc cmake build-essential git libssl-dev
|
|||
To build the [taosTools](https://github.com/taosdata/taos-tools) on Ubuntu/Debian, the following packages need to be installed.
|
||||
|
||||
```bash
|
||||
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev pkg-config
|
||||
sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config
|
||||
```
|
||||
|
||||
### CentOS 7.9
|
||||
|
@ -85,7 +85,7 @@ sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
|
|||
#### CentOS 7.9
|
||||
|
||||
```
|
||||
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
|
||||
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
|
||||
```
|
||||
|
||||
#### CentOS 8/Rocky Linux
|
||||
|
@ -94,7 +94,7 @@ sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgco
|
|||
sudo yum install -y epel-release
|
||||
sudo yum install -y dnf-plugins-core
|
||||
sudo yum config-manager --set-enabled powertools
|
||||
sudo yum install -y zlib-devel xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libstdc++-static openssl-devel
|
||||
sudo yum install -y zlib-devel zlib-static xz-devel snappy-devel jansson jansson-devel pkgconfig libatomic libatomic-static libstdc++-static openssl-devel
|
||||
```
|
||||
|
||||
Note: Since snappy lacks pkg-config support (refer to [link](https://github.com/google/snappy/pull/86)), it leads a cmake prompt libsnappy not found. But snappy still works well.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG e62c5ea
|
||||
GIT_TAG ac69142
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -70,6 +70,11 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
|||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL);
|
||||
}
|
||||
|
||||
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER;
|
||||
}
|
||||
|
||||
/* ------------------------ OTHER DEFINITIONS ------------------------ */
|
||||
// IE type
|
||||
#define TSDB_IE_TYPE_SEC 1
|
||||
|
|
|
@ -234,7 +234,6 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
|
||||
|
@ -265,7 +264,8 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_DELETE,
|
||||
QUERY_NODE_PHYSICAL_SUBPLAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN
|
||||
QUERY_NODE_PHYSICAL_PLAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
|
||||
} ENodeType;
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,6 +47,7 @@ extern "C" {
|
|||
|
||||
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
||||
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
||||
#define SYNC_SNAP_RESEND_MS 1000 * 60
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
|
|
@ -27,6 +27,7 @@ extern "C" {
|
|||
|
||||
#if !defined(WINDOWS)
|
||||
#include <dirent.h>
|
||||
#include <execinfo.h>
|
||||
#include <libgen.h>
|
||||
#include <sched.h>
|
||||
#include <unistd.h>
|
||||
|
|
|
@ -120,12 +120,6 @@ void syslog(int unused, const char *format, ...);
|
|||
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
|
||||
#define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2))
|
||||
|
||||
#ifndef NDEBUG
|
||||
#define ASSERT(x) assert(x)
|
||||
#else
|
||||
#define ASSERT(x)
|
||||
#endif
|
||||
|
||||
#ifndef UNUSED
|
||||
#define UNUSED(x) ((void)(x))
|
||||
#endif
|
||||
|
|
|
@ -62,6 +62,7 @@ int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen);
|
|||
bool taosIsDir(const char *dirname);
|
||||
char *taosDirName(char *dirname);
|
||||
char *taosDirEntryBaseName(char *dirname);
|
||||
void taosGetCwd(char *buf, int32_t len);
|
||||
|
||||
TdDirPtr taosOpenDir(const char *dirname);
|
||||
TdDirEntryPtr taosReadDir(TdDirPtr pDir);
|
||||
|
|
|
@ -62,7 +62,7 @@ typedef int32_t TdUcs4;
|
|||
int32_t taosUcs4len(TdUcs4 *ucs4);
|
||||
int64_t taosStr2int64(const char *str);
|
||||
|
||||
void taosConvInit(void);
|
||||
int32_t taosConvInit(void);
|
||||
void taosConvDestroy();
|
||||
int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs);
|
||||
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len);
|
||||
|
|
|
@ -46,6 +46,29 @@ void taosSetTerminalMode();
|
|||
int32_t taosGetOldTerminalMode();
|
||||
void taosResetTerminalMode();
|
||||
|
||||
#if !defined(WINDOWS)
|
||||
#define taosPrintTrace(flags, level, dflag) \
|
||||
{ \
|
||||
void* array[100]; \
|
||||
int32_t size = backtrace(array, 100); \
|
||||
char** strings = backtrace_symbols(array, size); \
|
||||
if (strings != NULL) { \
|
||||
taosPrintLog(flags, level, dflag, "obtained %d stack frames", size); \
|
||||
for (int32_t i = 0; i < size; i++) { \
|
||||
taosPrintLog(flags, level, dflag, "frame:%d, %s", i, strings[i]); \
|
||||
} \
|
||||
} \
|
||||
\
|
||||
taosMemoryFree(strings); \
|
||||
}
|
||||
#else
|
||||
#define taosPrintTrace(flags, level, dflag) \
|
||||
{ \
|
||||
taosPrintLog(flags, level, dflag, \
|
||||
"backtrace not implemented on windows, so detailed stack information cannot be printed"); \
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#define _TD_UTIL_CODING_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
|
|
@ -307,8 +307,9 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MIN_DURATION_PER_FILE 60 // unit minute
|
||||
#define TSDB_MAX_DURATION_PER_FILE (3650 * 1440)
|
||||
#define TSDB_DEFAULT_DURATION_PER_FILE (10 * 1440)
|
||||
#define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute
|
||||
#define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved.
|
||||
#define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute
|
||||
#define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved.
|
||||
#define TSDB_MAX_KEEP_NS (365 * 292 * 1440) // data in db to be reserved.
|
||||
#define TSDB_DEFAULT_KEEP (3650 * 1440) // ten years
|
||||
#define TSDB_MIN_MINROWS_FBLOCK 10
|
||||
#define TSDB_MAX_MINROWS_FBLOCK 1000
|
||||
|
|
|
@ -38,6 +38,7 @@ typedef void (*LogFp)(int64_t ts, ELogLevel level, const char *content);
|
|||
|
||||
extern bool tsLogEmbedded;
|
||||
extern bool tsAsyncLog;
|
||||
extern bool tsAssert;
|
||||
extern int32_t tsNumOfLogLines;
|
||||
extern int32_t tsLogKeepDays;
|
||||
extern LogFp tsLogFp;
|
||||
|
@ -82,6 +83,10 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
|
|||
#endif
|
||||
;
|
||||
|
||||
bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...);
|
||||
#define ASSERTS(condition, ...) taosAssert(condition, __FILE__, __LINE__, __VA_ARGS__)
|
||||
#define ASSERT(condition) ASSERTS(condition, "assert info not provided")
|
||||
|
||||
// clang-format off
|
||||
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
|
||||
|
|
|
@ -407,7 +407,9 @@ void taos_init_imp(void) {
|
|||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
taosConvInit();
|
||||
if (taosConvInit() != 0) {
|
||||
ASSERTS(0, "failed to init conv");
|
||||
}
|
||||
|
||||
rpcInit();
|
||||
|
||||
|
|
|
@ -796,9 +796,10 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
|||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
||||
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code));
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pWrapper->pCatalogReq->forceUpdate = false;
|
||||
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
}
|
||||
|
||||
|
|
|
@ -875,6 +875,7 @@ void tmqFreeImpl(void* handle) {
|
|||
tmq_t* tmq = (tmq_t*)handle;
|
||||
|
||||
// TODO stop timer
|
||||
tmqClearUnhandleMsg(tmq);
|
||||
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
||||
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
||||
if (tmq->qall) taosFreeQall(tmq->qall);
|
||||
|
@ -884,8 +885,7 @@ void tmqFreeImpl(void* handle) {
|
|||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
||||
taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||
taosArrayDestroy(pTopic->vgs);
|
||||
}
|
||||
taosArrayDestroy(tmq->clientTopics);
|
||||
|
@ -1304,7 +1304,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
|||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
||||
taosArrayDestroy(pTopic->vgs);
|
||||
}
|
||||
taosArrayDestroy(tmq->clientTopics);
|
||||
|
@ -1410,7 +1409,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
|||
return -1;
|
||||
}
|
||||
void* pReq = taosMemoryCalloc(1, tlen);
|
||||
if (tlen < 0) {
|
||||
if (pReq == NULL) {
|
||||
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1738,7 +1737,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
taosFreeQitem(pollRspWrapper);
|
||||
return pRsp;
|
||||
} else {
|
||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
|
|
|
@ -333,6 +333,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
|
|||
if (cfgAddTimezone(pCfg, "timezone", tsTimezoneStr) != 0) return -1;
|
||||
if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1;
|
||||
if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "assert", 1, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1;
|
||||
|
||||
|
@ -693,6 +694,8 @@ static void taosSetSystemCfg(SConfig *pCfg) {
|
|||
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
|
||||
taosSetCoreDump(enableCore);
|
||||
|
||||
tsAssert = cfgGetItem(pCfg, "assert")->bval;
|
||||
|
||||
// todo
|
||||
tsVersion = 30000000;
|
||||
}
|
||||
|
@ -788,6 +791,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
case 'a': {
|
||||
if (strcasecmp("asyncLog", name) == 0) {
|
||||
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
|
||||
} else if (strcasecmp("assert", name) == 0) {
|
||||
tsAssert = cfgGetItem(pCfg, "assert")->bval;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
#undef TD_MSG_SEG_CODE_
|
||||
#include "tmsgdef.h"
|
||||
|
||||
#include "tlog.h"
|
||||
|
||||
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "trow.h"
|
||||
#include "tlog.h"
|
||||
|
||||
const uint8_t tdVTypeByte[2][3] = {{
|
||||
// 2 bits
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "ttime.h"
|
||||
|
||||
#include "tlog.h"
|
||||
|
||||
/*
|
||||
* mktime64 - Converts date to seconds.
|
||||
* Converts Gregorian date to seconds since 1970-01-01 00:00:00.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "dmMgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tconfig.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
// clang-format off
|
||||
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
||||
|
@ -45,9 +46,30 @@ static struct {
|
|||
SArray *pArgs; // SConfigPair
|
||||
} global = {0};
|
||||
|
||||
static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); }
|
||||
static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143, true); }
|
||||
static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert = 1; }
|
||||
|
||||
static void dmStopDnode(int signum, void *sigInfo, void *context) {
|
||||
// taosIgnSignal(SIGUSR1);
|
||||
// taosIgnSignal(SIGUSR2);
|
||||
taosIgnSignal(SIGTERM);
|
||||
taosIgnSignal(SIGHUP);
|
||||
taosIgnSignal(SIGINT);
|
||||
taosIgnSignal(SIGABRT);
|
||||
taosIgnSignal(SIGBREAK);
|
||||
|
||||
dInfo("shut down signal is %d", signum);
|
||||
#ifndef WINDOWS
|
||||
dInfo("sender PID:%d cmdline:%s", ((siginfo_t *)sigInfo)->si_pid,
|
||||
taosGetCmdlineByPID(((siginfo_t *)sigInfo)->si_pid));
|
||||
#endif
|
||||
|
||||
dmStop();
|
||||
}
|
||||
|
||||
static void dmSetSignalHandle() {
|
||||
taosSetSignal(SIGUSR1, dmSetDebugFlag);
|
||||
taosSetSignal(SIGUSR2, dmSetAssert);
|
||||
taosSetSignal(SIGTERM, dmStopDnode);
|
||||
taosSetSignal(SIGHUP, dmStopDnode);
|
||||
taosSetSignal(SIGINT, dmStopDnode);
|
||||
|
@ -105,6 +127,19 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dmPrintArgs(int32_t argc, char const *argv[]) {
|
||||
char path[1024] = {0};
|
||||
taosGetCwd(path, sizeof(path));
|
||||
|
||||
char args[1024] = {0};
|
||||
int32_t arglen = snprintf(args, sizeof(args), "%s", argv[0]);
|
||||
for (int32_t i = 1; i < argc; ++i) {
|
||||
arglen = arglen + snprintf(args + arglen, sizeof(args) - arglen, " %s", argv[i]);
|
||||
}
|
||||
|
||||
dInfo("startup path:%s args:%s", path, args);
|
||||
}
|
||||
|
||||
static void dmGenerateGrant() { mndGenerateMachineCode(); }
|
||||
|
||||
static void dmPrintVersion() {
|
||||
|
@ -194,6 +229,8 @@ int mainWindows(int argc, char **argv) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
dmPrintArgs(argc, argv);
|
||||
|
||||
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
|
||||
dError("failed to start since read config error");
|
||||
taosCloseLog();
|
||||
|
@ -201,7 +238,12 @@ int mainWindows(int argc, char **argv) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
taosConvInit();
|
||||
if (taosConvInit() != 0) {
|
||||
dError("failed to init conv");
|
||||
taosCloseLog();
|
||||
taosCleanupArgs();
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (global.dumpConfig) {
|
||||
dmDumpCfg();
|
||||
|
|
|
@ -139,7 +139,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
|
||||
SSnode *pSnode = pMgmt->pSnode;
|
||||
if (pSnode == NULL) {
|
||||
dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
|
||||
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
|
||||
TMSG_INFO(pMsg->msgType), qtype);
|
||||
taosFreeQitem(pMsg);
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
|
@ -161,7 +161,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
smPutNodeMsgToWriteQueue(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
ASSERTS(0, "msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
|
||||
TMSG_INFO(pMsg->msgType), qtype);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -301,6 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) {
|
|||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.parent = pDnode;
|
||||
rpcInit.compressSize = tsCompressMsgSize;
|
||||
|
||||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->serverRpc == NULL) {
|
||||
|
|
|
@ -825,7 +825,13 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
dbObj.cfgVersion++;
|
||||
dbObj.updateTime = taosGetTimestampMs();
|
||||
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
if (dbObj.cfg.replications != pDb->cfg.replications) {
|
||||
// return quickly, operation executed asynchronously
|
||||
mInfo("db:%s, alter db replica from %d to %d", pDb->name, pDb->cfg.replications, dbObj.cfg.replications);
|
||||
} else {
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -119,7 +119,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
|
|||
}
|
||||
|
||||
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||
int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
||||
int32_t code = 0;
|
||||
if (!syncUtilUserCommit(pMsg->msgType)) {
|
||||
goto _out;
|
||||
}
|
||||
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
||||
|
||||
_out:
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
return code;
|
||||
|
|
|
@ -174,7 +174,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
|
|||
void tsdbReaderClose(STsdbReader *pReader);
|
||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockSMA, bool *allHave);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave);
|
||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
|
|
|
@ -454,7 +454,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
|||
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||
memcpy(pBuf + sizeof(suid), pNode->data, keyLen);
|
||||
memcpy(&pBuf[1], pNode->data, keyLen);
|
||||
|
||||
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
||||
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
|
||||
|
|
|
@ -1353,6 +1353,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (stbEntry.stbEntry.schemaTag.pSchema == NULL) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
|
||||
|
||||
STagVal tagVal = {.cid = pTagColumn->colId};
|
||||
|
|
|
@ -952,7 +952,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
|
||||
|
||||
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
|
||||
if (code) goto _err;
|
||||
if (code) {
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ typedef struct {
|
|||
// --------------
|
||||
TSKEY nextKey; // reset by each table commit
|
||||
int32_t commitFid;
|
||||
int32_t expLevel;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
// commit file data
|
||||
|
@ -503,6 +504,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
|
||||
// memory
|
||||
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
|
||||
pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
|
||||
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
|
||||
&pCommitter->maxKey);
|
||||
#if 0
|
||||
|
@ -556,7 +558,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
}
|
||||
} else {
|
||||
SDiskID did = {0};
|
||||
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
|
||||
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
|
||||
wSet.diskId = did;
|
||||
wSet.nSttF = 1;
|
||||
|
|
|
@ -962,6 +962,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
|||
}
|
||||
}
|
||||
|
||||
pDFileSet->diskId = pSet->diskId;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4112,8 +4112,9 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) {
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
|
||||
int32_t code = 0;
|
||||
SColumnDataAgg ***pBlockSMA = &pDataBlock->pBlockAgg;
|
||||
*allHave = false;
|
||||
|
||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||
|
@ -4161,6 +4162,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
|
|||
int32_t i = 0, j = 0;
|
||||
size_t size = taosArrayGetSize(pSup->pColAgg);
|
||||
|
||||
// ensure capacity
|
||||
if(pDataBlock->pDataBlock) {
|
||||
size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
taosArrayEnsureCap(pSup->pColAgg, colsNum);
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
if (pResBlock->pBlockAgg == NULL) {
|
||||
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
||||
|
|
|
@ -190,9 +190,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
version);
|
||||
|
||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||
ASSERT(pVnode->state.applied + 1 == version);
|
||||
|
||||
pVnode->state.applied = version;
|
||||
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
|
||||
|
||||
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
|
||||
|
||||
// skip header
|
||||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
|
|
@ -547,6 +547,14 @@ typedef struct SCtgOperation {
|
|||
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
|
||||
#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
|
||||
|
||||
#define CTG_LOCK_DEBUG(...) \
|
||||
do { \
|
||||
if (gCTGDebug.lockEnable) { \
|
||||
|
|
|
@ -1094,6 +1094,9 @@ _return:
|
|||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
||||
}
|
||||
if (pTask->res || code) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
}
|
||||
|
@ -1124,7 +1127,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
@ -1144,7 +1147,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
@ -1162,7 +1165,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
|
@ -1180,7 +1183,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
|
||||
ctgTaskError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
@ -1190,7 +1193,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
|
||||
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
|
||||
|
@ -1207,11 +1210,11 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
STableMeta* stbMeta = NULL;
|
||||
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta);
|
||||
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
||||
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
exist = 1;
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
} else {
|
||||
ctgDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
}
|
||||
|
@ -1225,7 +1228,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
break;
|
||||
}
|
||||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
ctgTaskError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
|
@ -1280,6 +1283,7 @@ _return:
|
|||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
}
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pTask->res && taskDone) {
|
||||
|
|
|
@ -224,7 +224,7 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA
|
|||
|
||||
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||
bool allColumnsHaveAgg = true;
|
||||
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pBlock->pBlockAgg, &allColumnsHaveAgg);
|
||||
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
|
|
@ -95,6 +95,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
|
|||
// TODO: optimize to ignore null values for linear interpolation.
|
||||
if (!pLinearInfo->isStartSet) {
|
||||
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
||||
ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type));
|
||||
|
||||
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
||||
memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
|
||||
}
|
||||
|
|
|
@ -863,19 +863,20 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
|||
|
||||
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
||||
SArray* res = (SArray*)data;
|
||||
SWinKey* pos = taosArrayGet(res, index);
|
||||
SResKeyPos* pData = (SResKeyPos*)pKey;
|
||||
if (*(int64_t*)pData->key == pos->ts) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (*(int64_t*)pData->key > pos->ts) {
|
||||
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||
SResKeyPos* pRKey = (SResKeyPos*)pKey;
|
||||
if (pRKey->groupId > pDataPos->groupId) {
|
||||
return 1;
|
||||
} else if (pRKey->groupId < pDataPos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
|
||||
if (*(int64_t*)pRKey->key > pDataPos->ts) {
|
||||
return 1;
|
||||
} else if (*(int64_t*)pRKey->key < pDataPos->ts){
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
||||
|
@ -1400,19 +1401,21 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
|||
|
||||
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
||||
SArray* res = (SArray*)data;
|
||||
SWinKey* pos = taosArrayGet(res, index);
|
||||
SWinKey* pData = (SWinKey*)pKey;
|
||||
if (pData->ts == pos->ts) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (pData->ts > pos->ts) {
|
||||
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||
SWinKey* pWKey = (SWinKey*)pKey;
|
||||
|
||||
if (pWKey->groupId > pDataPos->groupId) {
|
||||
return 1;
|
||||
} else if (pWKey->groupId < pDataPos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
|
||||
if (pWKey->ts > pDataPos->ts) {
|
||||
return 1;
|
||||
} else if (pWKey->ts < pDataPos->ts) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "taoserror.h"
|
||||
#include "tdef.h"
|
||||
#include "tpagedbuf.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define LHASH_CAP_RATIO 0.85
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "tpagedbuf.h"
|
||||
#include "tpercentile.h"
|
||||
#include "ttypes.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define DEFAULT_NUM_OF_SLOT 1024
|
||||
|
||||
|
@ -367,11 +368,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
|
|||
pSlot->info.data = NULL;
|
||||
}
|
||||
|
||||
SArray *pPageIdList = (SArray *)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||
if (pPageIdList == NULL) {
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES);
|
||||
pPageIdList = pList;
|
||||
SArray *pPageIdList;
|
||||
void *p = taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||
if (p == NULL) {
|
||||
pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pPageIdList, POINTER_BYTES);
|
||||
} else {
|
||||
pPageIdList = *(SArray **)p;
|
||||
}
|
||||
|
||||
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
|
||||
|
|
|
@ -88,11 +88,13 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
|
|||
}
|
||||
#ifdef WINDOWS
|
||||
if (strlen(path) == 0) {
|
||||
strcat(path, "udfd.exe");
|
||||
} else {
|
||||
strcat(path, "\\udfd.exe");
|
||||
strcat(path, "C:\\TDengine");
|
||||
}
|
||||
strcat(path, "\\udfd.exe");
|
||||
#else
|
||||
if (strlen(path) == 0) {
|
||||
strcat(path, "/usr/bin");
|
||||
}
|
||||
strcat(path, "/udfd");
|
||||
#endif
|
||||
char *argsUdfd[] = {path, "-c", configDir, NULL};
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SInsertParseContext {
|
|||
SParsedDataColInfo tags; // for stmt
|
||||
bool missCache;
|
||||
bool usingDuplicateTable;
|
||||
bool forceUpdate;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||
|
@ -829,6 +830,11 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
|||
}
|
||||
|
||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
|
@ -844,6 +850,11 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt
|
|||
}
|
||||
|
||||
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
|
@ -1909,6 +1920,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)
|
||||
};
|
||||
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
|
|
|
@ -3879,12 +3879,17 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
|
|||
pOptions->keep[2] = getBigintFromValueNode((SValueNode*)nodesListGetNode(pOptions->pKeep, 2));
|
||||
}
|
||||
|
||||
int64_t tsdbMaxKeep = TSDB_MAX_KEEP;
|
||||
if (pOptions->precision == TSDB_TIME_PRECISION_NANO) {
|
||||
tsdbMaxKeep = TSDB_MAX_KEEP_NS;
|
||||
}
|
||||
|
||||
if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP ||
|
||||
pOptions->keep[0] > TSDB_MAX_KEEP || pOptions->keep[1] > TSDB_MAX_KEEP || pOptions->keep[2] > TSDB_MAX_KEEP) {
|
||||
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
||||
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
|
||||
TSDB_MAX_KEEP);
|
||||
tsdbMaxKeep);
|
||||
}
|
||||
|
||||
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
||||
|
@ -4036,7 +4041,10 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
|||
TSDB_MAX_MINROWS_FBLOCK);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbKeepOption(pCxt, pOptions);
|
||||
code = checkDbPrecisionOption(pCxt, pOptions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
||||
|
@ -4049,9 +4057,6 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
|||
code = checkDbRangeOption(pCxt, "tsdbPagesize", pOptions->tsdbPageSize, TSDB_MIN_TSDB_PAGESIZE,
|
||||
TSDB_MAX_TSDB_PAGESIZE);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbPrecisionOption(pCxt, pOptions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbEnumOption(pCxt, "replications", pOptions->replica, TSDB_MIN_DB_REPLICA, TSDB_MAX_DB_REPLICA);
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SSyncSnapshotSender {
|
|||
SyncTerm term;
|
||||
int64_t startTime;
|
||||
int64_t endTime;
|
||||
int64_t lastSendTime;
|
||||
bool finish;
|
||||
|
||||
// init when create
|
||||
|
|
|
@ -79,7 +79,6 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len);
|
|||
void syncUtilMsgHtoN(void* msg);
|
||||
void syncUtilMsgNtoH(void* msg);
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType);
|
||||
bool syncUtilUserCommit(tmsg_t msgType);
|
||||
bool syncUtilUserRollback(tmsg_t msgType);
|
||||
|
||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
|
||||
|
|
|
@ -84,7 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||
ASSERT(false && "deprecated");
|
||||
ASSERTS(false, "deprecated");
|
||||
if (pSyncNode == NULL) {
|
||||
sError("pSyncNode is NULL");
|
||||
return;
|
||||
|
|
|
@ -791,9 +791,9 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
}
|
||||
|
||||
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
|
||||
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
||||
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
|
||||
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
|
||||
ASSERTS(pNode->pLogStore != NULL, "log store not created");
|
||||
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
||||
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
||||
SSnapshot snapshot;
|
||||
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
||||
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
|
||||
|
@ -1144,8 +1144,8 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->pLogStore != NULL && "log store not created");
|
||||
ASSERT(pSyncNode->pLogBuf != NULL && "ring log buffer not created");
|
||||
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
|
||||
ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
|
||||
|
||||
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
|
||||
|
@ -2663,7 +2663,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
|
||||
int32_t code = syncNodeAppend(ths, pEntry);
|
||||
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
|
||||
ASSERT(false && "failed to append blocking msg");
|
||||
ASSERTS(false, "failed to append blocking msg");
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
|
||||
// initial log buffer with at least one item, e.g. commitIndex
|
||||
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERT(pMatch != NULL && "no matched log entry");
|
||||
ASSERTS(pMatch != NULL, "no matched log entry");
|
||||
ASSERT(pMatch->index + 1 == index);
|
||||
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||
|
@ -86,14 +86,14 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
|
|||
|
||||
if (prevIndex >= pBuf->startIndex) {
|
||||
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
|
||||
ASSERT(pEntry != NULL && "no log entry found");
|
||||
ASSERTS(pEntry != NULL, "no log entry found");
|
||||
prevLogTerm = pEntry->term;
|
||||
return prevLogTerm;
|
||||
}
|
||||
|
||||
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
|
||||
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
|
||||
ASSERT(timeMs != 0 && "no log entry found");
|
||||
ASSERTS(timeMs != 0, "no log entry found");
|
||||
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
|
||||
ASSERT(prevIndex == 0 || prevLogTerm != 0);
|
||||
return prevLogTerm;
|
||||
|
@ -141,9 +141,9 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
|
|||
}
|
||||
|
||||
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
||||
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
|
||||
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
|
||||
ASSERTS(pNode->pLogStore != NULL, "log store not created");
|
||||
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
||||
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
||||
|
||||
SSnapshot snapshot;
|
||||
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
||||
|
@ -437,7 +437,7 @@ _out:
|
|||
}
|
||||
|
||||
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
|
||||
ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");
|
||||
ASSERTS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM");
|
||||
|
||||
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
|
||||
return 0;
|
||||
|
@ -513,13 +513,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
|
||||
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
pBuf->commitIndex = index;
|
||||
if (!inBuf) {
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||
", role: %d, current term: %" PRId64,
|
||||
|
@ -905,7 +900,7 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
|
|||
ASSERT(pNode->logReplMgrs[i] == NULL);
|
||||
pNode->logReplMgrs[i] = syncLogReplMgrCreate();
|
||||
pNode->logReplMgrs[i]->peerId = i;
|
||||
ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory.");
|
||||
ASSERTS(pNode->logReplMgrs[i] != NULL, "Out of memory.");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
pSender->sendingMS = 0;
|
||||
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
pSender->startTime = taosGetTimestampMs();
|
||||
pSender->lastSendTime = pSender->startTime;
|
||||
pSender->finish = false;
|
||||
|
||||
// build begin msg
|
||||
|
@ -201,6 +202,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
||||
|
||||
pSender->lastSendTime = taosGetTimestampMs();
|
||||
|
||||
// event log
|
||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||
sSTrace(pSender, "snapshot sender finish");
|
||||
|
@ -213,33 +216,36 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
// send snapshot data from cache
|
||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||
// send current block data
|
||||
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
// pMsg->privateTerm = pSender->privateTerm;
|
||||
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
|
||||
|
||||
// send msg
|
||||
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
||||
|
||||
// event log
|
||||
sSTrace(pSender, "snapshot sender resend");
|
||||
}
|
||||
|
||||
// send msg
|
||||
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
||||
|
||||
pSender->lastSendTime = taosGetTimestampMs();
|
||||
|
||||
// event log
|
||||
sSTrace(pSender, "snapshot sender resend");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "syncRaftLog.h"
|
||||
#include "syncReplication.h"
|
||||
#include "syncRespMgr.h"
|
||||
#include "syncSnapshot.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
|
||||
|
@ -70,6 +71,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
|||
}
|
||||
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
|
||||
for (int i = 0; i < ths->peersNum; ++i) {
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
|
||||
if (pSender != NULL) {
|
||||
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
|
||||
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
|
||||
snapshotReSend(pSender);
|
||||
} else {
|
||||
sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId,
|
||||
ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
|
||||
// end timeout wal snapshot
|
||||
if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS &&
|
||||
|
|
|
@ -160,8 +160,6 @@ void syncUtilMsgNtoH(void* msg) {
|
|||
|
||||
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||
|
||||
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
||||
|
@ -568,7 +566,7 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
|
|||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
|
||||
"send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
|
||||
", stime:%" PRId64 ", seq:%d}, %s",
|
||||
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
|
||||
}
|
||||
|
@ -595,7 +593,7 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
|
|||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
|
||||
"send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
|
||||
", stime:%" PRId64 ", ack:%d}, %s",
|
||||
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ SyncAppendEntriesReply *createMsg() {
|
|||
pMsg->success = true;
|
||||
pMsg->matchIndex = 77;
|
||||
pMsg->term = 33;
|
||||
pMsg->privateTerm = 44;
|
||||
// pMsg->privateTerm = 44;
|
||||
pMsg->startTime = taosGetTimestampMs();
|
||||
return pMsg;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#include "syncTest.h"
|
||||
#include <gtest/gtest.h>
|
||||
// #include <gtest/gtest.h>
|
||||
|
||||
/*
|
||||
typedef enum {
|
||||
|
|
|
@ -81,6 +81,8 @@ int32_t syncIOQTimerStop();
|
|||
int32_t syncIOPingTimerStart();
|
||||
int32_t syncIOPingTimerStop();
|
||||
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -469,3 +469,5 @@ static void syncIOTickPing(void *param, void *tmrId) {
|
|||
|
||||
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
|
||||
}
|
||||
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry) {}
|
|
@ -1583,8 +1583,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
// snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
// cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include "tlog.h"
|
||||
|
||||
typedef struct SPoolMem {
|
||||
int64_t size;
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include "tlog.h"
|
||||
|
||||
typedef struct SPoolMem {
|
||||
int64_t size;
|
||||
|
|
|
@ -1195,6 +1195,8 @@ void transCloseServer(void* arg) {
|
|||
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
||||
destroyWorkThrd(srv->pThreadObj[i]);
|
||||
}
|
||||
} else {
|
||||
uv_loop_close(srv->loop);
|
||||
}
|
||||
|
||||
taosMemoryFree(srv->pThreadObj);
|
||||
|
|
|
@ -163,7 +163,7 @@ int32_t taosMulMkDir(const char *dirname) {
|
|||
code = mkdir(temp, 0755);
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return code;
|
||||
}
|
||||
*pos = TD_DIRSEP[0];
|
||||
|
@ -179,7 +179,7 @@ int32_t taosMulMkDir(const char *dirname) {
|
|||
code = mkdir(temp, 0755);
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
|
|||
code = mkdir(temp, mode);
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return code;
|
||||
}
|
||||
*pos = TD_DIRSEP[0];
|
||||
|
@ -241,7 +241,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
|
|||
code = mkdir(temp, mode);
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -497,3 +497,11 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
|
|||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
void taosGetCwd(char *buf, int32_t len) {
|
||||
#if !defined(WINDOWS)
|
||||
(void)getcwd(buf, len - 1);
|
||||
#else
|
||||
strncpy(buf, "not implemented on windows", len -1);
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -313,7 +313,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
assert(!(tdFileOptions & TD_FILE_EXCL));
|
||||
fp = fopen(path, mode);
|
||||
if (fp == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
|
@ -336,14 +336,14 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
#endif
|
||||
if (fd == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
|
||||
if (pFile == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (fd >= 0) close(fd);
|
||||
if (fp != NULL) fclose(fp);
|
||||
return NULL;
|
||||
|
|
|
@ -348,7 +348,7 @@ void taosMemoryTrim(int32_t size) {
|
|||
|
||||
void* taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
|
||||
#ifdef USE_TD_MEMORY
|
||||
ASSERT(0);
|
||||
assert(0);
|
||||
#else
|
||||
#if defined(LINUX)
|
||||
void* p = memalign(alignment, size);
|
||||
|
|
|
@ -143,15 +143,17 @@ SConv *gConv = NULL;
|
|||
int32_t convUsed = 0;
|
||||
int32_t gConvMaxNum = 0;
|
||||
|
||||
void taosConvInit(void) {
|
||||
int32_t taosConvInit(void) {
|
||||
gConvMaxNum = 512;
|
||||
gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv));
|
||||
for (int32_t i = 0; i < gConvMaxNum; ++i) {
|
||||
gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
||||
if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void taosConvDestroy() {
|
||||
|
|
|
@ -617,14 +617,14 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
|
|||
return 0;
|
||||
} else {
|
||||
// printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
#elif defined(_TD_DARWIN_64)
|
||||
struct statvfs info;
|
||||
if (statvfs(dataDir, &info)) {
|
||||
// printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
} else {
|
||||
diskSize->total = info.f_blocks * info.f_frsize;
|
||||
|
@ -635,7 +635,7 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
|
|||
#else
|
||||
struct statvfs info;
|
||||
if (statvfs(dataDir, &info)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
} else {
|
||||
diskSize->total = info.f_blocks * info.f_frsize;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "talgo.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define doswap(__left, __right, __size, __buf) \
|
||||
do { \
|
||||
|
|
|
@ -181,7 +181,7 @@ int32_t tjsonGetObjectValueString(const SJson* pJson, char** pValueString) {
|
|||
int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) {
|
||||
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
||||
if (NULL == p) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
strcpy(pVal, p);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -190,7 +190,7 @@ int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) {
|
|||
int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal) {
|
||||
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
||||
if (NULL == p) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
*pVal = strdup(p);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -199,7 +199,7 @@ int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal)
|
|||
int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal) {
|
||||
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
||||
if (NULL == p) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#ifdef WINDOWS
|
||||
sscanf(p, "%" PRId64, pVal);
|
||||
|
@ -233,7 +233,7 @@ int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal
|
|||
int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal) {
|
||||
char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName));
|
||||
if (NULL == p) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#ifdef WINDOWS
|
||||
sscanf(p, "%" PRIu64, pVal);
|
||||
|
@ -259,6 +259,9 @@ int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pV
|
|||
|
||||
int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) {
|
||||
const SJson* pObject = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pObject) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!cJSON_IsBool(pObject)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -268,6 +271,9 @@ int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) {
|
|||
|
||||
int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal) {
|
||||
const SJson* pObject = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pObject) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!cJSON_IsNumber(pObject)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -282,7 +288,7 @@ SJson* tjsonGetArrayItem(const SJson* pJson, int32_t index) { return cJSON_GetAr
|
|||
int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj) {
|
||||
SJson* pJsonObj = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pJsonObj) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return func(pJsonObj, pObj);
|
||||
}
|
||||
|
@ -294,7 +300,7 @@ int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, v
|
|||
|
||||
SJson* pJsonObj = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pJsonObj) {
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
*pObj = taosMemoryCalloc(1, objSize);
|
||||
if (NULL == *pObj) {
|
||||
|
|
|
@ -72,6 +72,7 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */
|
|||
|
||||
bool tsLogEmbedded = 0;
|
||||
bool tsAsyncLog = true;
|
||||
bool tsAssert = true;
|
||||
int32_t tsNumOfLogLines = 10000000;
|
||||
int32_t tsLogKeepDays = 0;
|
||||
LogFp tsLogFp = NULL;
|
||||
|
@ -778,3 +779,37 @@ cmp_end:
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...) {
|
||||
if (condition) return false;
|
||||
|
||||
const char *flags = "UTL FATAL ";
|
||||
ELogLevel level = DEBUG_FATAL;
|
||||
int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag
|
||||
char buffer[LOG_MAX_LINE_BUFFER_SIZE];
|
||||
int32_t len = taosBuildLogHead(buffer, flags);
|
||||
|
||||
va_list argpointer;
|
||||
va_start(argpointer, format);
|
||||
len = len + vsnprintf(buffer + len, LOG_MAX_LINE_BUFFER_SIZE - len, format, argpointer);
|
||||
va_end(argpointer);
|
||||
buffer[len++] = '\n';
|
||||
buffer[len] = 0;
|
||||
taosPrintLogImp(1, 255, buffer, len);
|
||||
|
||||
taosPrintLog(flags, level, dflag, "tAssert at file %s:%d exit:%d", file, line, tsAssert);
|
||||
taosPrintTrace(flags, level, dflag);
|
||||
|
||||
if (tsAssert) {
|
||||
taosCloseLog();
|
||||
taosMsleep(300);
|
||||
|
||||
#ifdef NDEBUG
|
||||
abort();
|
||||
#else
|
||||
assert(0);
|
||||
#endif
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
|
@ -6,6 +6,7 @@
|
|||
#include "taos.h"
|
||||
#include "taosdef.h"
|
||||
#include "thash.h"
|
||||
#include "tlog.h"
|
||||
|
||||
namespace {
|
||||
|
||||
|
|
|
@ -1022,9 +1022,9 @@
|
|||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py
|
||||
#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insert_alltypes_json.py
|
||||
#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/invalid_commandline.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py
|
||||
|
|
|
@ -116,6 +116,25 @@ endi
|
|||
|
||||
print ============= step4: alter database
|
||||
sql alter database db replica 3
|
||||
$wt = 0
|
||||
stepwt1:
|
||||
$wt = $wt + 1
|
||||
sleep 1000
|
||||
if $wt == 200 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
print wait 1 seconds to alter
|
||||
goto stepwt1
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
print ---> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data27 $data28 $data29
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data26 $data37 $data38 $data39
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data36 $data47 $data48 $data49
|
||||
|
||||
$leaderIndex = 0
|
||||
|
||||
|
|
|
@ -148,6 +148,26 @@ endi
|
|||
|
||||
print ============= step3: alter database
|
||||
sql alter database db replica 1
|
||||
$wt = 0
|
||||
stepwt1:
|
||||
$wt = $wt + 1
|
||||
sleep 1000
|
||||
if $wt == 200 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
print wait 1 seconds to alter
|
||||
goto stepwt1
|
||||
endi
|
||||
|
||||
sql show db.vgroups
|
||||
print ---> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data27 $data28 $data29
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data26 $data37 $data38 $data39
|
||||
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data36 $data47 $data48 $data49
|
||||
|
||||
$hasleader = 0
|
||||
|
||||
$x = 0
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
#include "tlog.h"
|
||||
|
||||
int smlProcess_influx_Test() {
|
||||
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
|
Loading…
Reference in New Issue