Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0
This commit is contained in:
commit
d716fb3a55
|
@ -20,17 +20,23 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
|
||||||
|
#define SYNC_INDEX_BEGIN 0
|
||||||
|
#define SYNC_INDEX_INVALID -1
|
||||||
|
|
||||||
typedef uint64_t SyncNodeId;
|
typedef uint64_t SyncNodeId;
|
||||||
typedef int32_t SyncGroupId;
|
typedef int32_t SyncGroupId;
|
||||||
typedef int64_t SyncIndex;
|
typedef int64_t SyncIndex;
|
||||||
typedef uint64_t SyncTerm;
|
typedef uint64_t SyncTerm;
|
||||||
|
|
||||||
|
typedef struct SSyncNode SSyncNode;
|
||||||
|
typedef struct SSyncBuffer SSyncBuffer;
|
||||||
|
typedef struct SWal SWal;
|
||||||
|
typedef struct SSyncRaftEntry SSyncRaftEntry;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_STATE_FOLLOWER = 100,
|
TAOS_SYNC_STATE_FOLLOWER = 100,
|
||||||
TAOS_SYNC_STATE_CANDIDATE = 101,
|
TAOS_SYNC_STATE_CANDIDATE = 101,
|
||||||
|
@ -38,6 +44,17 @@ typedef enum {
|
||||||
TAOS_SYNC_STATE_ERROR = 103,
|
TAOS_SYNC_STATE_ERROR = 103,
|
||||||
} ESyncState;
|
} ESyncState;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
||||||
|
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
|
||||||
|
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
|
||||||
|
} ESyncProposeCode;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TAOS_SYNC_FSM_CB_SUCCESS = 0,
|
||||||
|
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
|
||||||
|
} ESyncFsmCbCode;
|
||||||
|
|
||||||
typedef struct SNodeInfo {
|
typedef struct SNodeInfo {
|
||||||
uint16_t nodePort;
|
uint16_t nodePort;
|
||||||
char nodeFqdn[TSDB_FQDN_LEN];
|
char nodeFqdn[TSDB_FQDN_LEN];
|
||||||
|
@ -55,11 +72,6 @@ typedef struct SSnapshot {
|
||||||
SyncTerm lastApplyTerm;
|
SyncTerm lastApplyTerm;
|
||||||
} SSnapshot;
|
} SSnapshot;
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TAOS_SYNC_FSM_CB_SUCCESS = 0,
|
|
||||||
TAOS_SYNC_FSM_CB_OTHER_ERROR,
|
|
||||||
} ESyncFsmCbCode;
|
|
||||||
|
|
||||||
typedef struct SFsmCbMeta {
|
typedef struct SFsmCbMeta {
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
|
@ -68,27 +80,15 @@ typedef struct SFsmCbMeta {
|
||||||
uint64_t seqNum;
|
uint64_t seqNum;
|
||||||
} SFsmCbMeta;
|
} SFsmCbMeta;
|
||||||
|
|
||||||
struct SRpcMsg;
|
|
||||||
typedef struct SRpcMsg SRpcMsg;
|
|
||||||
|
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
void* data;
|
void* data;
|
||||||
|
|
||||||
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
|
||||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
|
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
|
||||||
|
|
||||||
} SSyncFSM;
|
} SSyncFSM;
|
||||||
|
|
||||||
struct SSyncRaftEntry;
|
|
||||||
typedef struct SSyncRaftEntry SSyncRaftEntry;
|
|
||||||
|
|
||||||
#define SYNC_INDEX_BEGIN 0
|
|
||||||
#define SYNC_INDEX_INVALID -1
|
|
||||||
|
|
||||||
// abstract definition of log store in raft
|
// abstract definition of log store in raft
|
||||||
// SWal implements it
|
// SWal implements it
|
||||||
typedef struct SSyncLogStore {
|
typedef struct SSyncLogStore {
|
||||||
|
@ -117,11 +117,6 @@ typedef struct SSyncLogStore {
|
||||||
|
|
||||||
} SSyncLogStore;
|
} SSyncLogStore;
|
||||||
|
|
||||||
struct SWal;
|
|
||||||
typedef struct SWal SWal;
|
|
||||||
|
|
||||||
struct SEpSet;
|
|
||||||
typedef struct SEpSet SEpSet;
|
|
||||||
|
|
||||||
typedef struct SSyncInfo {
|
typedef struct SSyncInfo {
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
|
@ -130,10 +125,8 @@ typedef struct SSyncInfo {
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
SSyncFSM* pFsm;
|
SSyncFSM* pFsm;
|
||||||
SMsgCb* msgcb;
|
SMsgCb* msgcb;
|
||||||
|
|
||||||
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||||
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
||||||
|
|
||||||
} SSyncInfo;
|
} SSyncInfo;
|
||||||
|
|
||||||
int32_t syncInit();
|
int32_t syncInit();
|
||||||
|
@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
|
||||||
SyncTerm syncGetMyTerm(int64_t rid);
|
SyncTerm syncGetMyTerm(int64_t rid);
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
int32_t syncGetVgId(int64_t rid);
|
int32_t syncGetVgId(int64_t rid);
|
||||||
|
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
||||||
typedef enum {
|
bool syncEnvIsStart();
|
||||||
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
|
||||||
TAOS_SYNC_PROPOSE_NOT_LEADER,
|
|
||||||
TAOS_SYNC_PROPOSE_OTHER_ERROR,
|
|
||||||
} ESyncProposeCode;
|
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
|
||||||
|
|
||||||
bool syncEnvIsStart();
|
|
||||||
|
|
||||||
extern int32_t sDebugFlag;
|
|
||||||
|
|
||||||
//-----------------------------------------
|
|
||||||
struct SSyncNode;
|
|
||||||
typedef struct SSyncNode SSyncNode;
|
|
||||||
|
|
||||||
struct SSyncBuffer;
|
|
||||||
typedef struct SSyncBuffer SSyncBuffer;
|
|
||||||
//-----------------------------------------
|
|
||||||
|
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -20,9 +20,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "cJSON.h"
|
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
// ------------------ ds -------------------
|
// ------------------ ds -------------------
|
||||||
|
@ -32,9 +29,6 @@ typedef struct SRaftId {
|
||||||
} SRaftId;
|
} SRaftId;
|
||||||
|
|
||||||
// ------------------ control -------------------
|
// ------------------ control -------------------
|
||||||
struct SSyncNode;
|
|
||||||
typedef struct SSyncNode SSyncNode;
|
|
||||||
|
|
||||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||||
void syncNodeRelease(SSyncNode* pNode);
|
void syncNodeRelease(SSyncNode* pNode);
|
||||||
|
|
||||||
|
|
|
@ -65,12 +65,14 @@ for (int i = 1; i < keyLen; ++i) { \
|
||||||
#define OTD_TIMESTAMP_COLUMN_NAME "ts"
|
#define OTD_TIMESTAMP_COLUMN_NAME "ts"
|
||||||
#define OTD_METRIC_VALUE_COLUMN_NAME "value"
|
#define OTD_METRIC_VALUE_COLUMN_NAME "value"
|
||||||
|
|
||||||
#define TS "_ts"
|
#define TS "_ts"
|
||||||
#define TS_LEN 3
|
#define TS_LEN 3
|
||||||
#define TAG "_tagNone"
|
#define TAG "_tag"
|
||||||
#define TAG_LEN 8
|
#define TAG_LEN 4
|
||||||
#define VALUE "value"
|
#define TAG_VALUE "NULL"
|
||||||
#define VALUE_LEN 5
|
#define TAG_VALUE_LEN 4
|
||||||
|
#define VALUE "value"
|
||||||
|
#define VALUE_LEN 5
|
||||||
|
|
||||||
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
|
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
|
||||||
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
|
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
|
||||||
|
@ -598,25 +600,33 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
|
||||||
kvVal->type = TSDB_DATA_TYPE_FLOAT;
|
kvVal->type = TSDB_DATA_TYPE_FLOAT;
|
||||||
kvVal->f = (float)result;
|
kvVal->f = (float)result;
|
||||||
}else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
|
}else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
|
||||||
if(result >= (double)INT64_MAX){
|
if(smlDoubleToInt64OverFlow(result)){
|
||||||
kvVal->i = INT64_MAX;
|
errno = 0;
|
||||||
}else if(result <= (double)INT64_MIN){
|
int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
|
||||||
kvVal->i = INT64_MIN;
|
if(errno == ERANGE){
|
||||||
}else{
|
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
|
||||||
kvVal->i = result;
|
return false;
|
||||||
|
}
|
||||||
|
kvVal->type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
kvVal->i = tmp;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
kvVal->type = TSDB_DATA_TYPE_BIGINT;
|
kvVal->type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
kvVal->i = (int64_t)result;
|
||||||
}else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
|
}else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
|
||||||
if(result < 0){
|
if(result >= (double)UINT64_MAX || result < 0){
|
||||||
smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal);
|
errno = 0;
|
||||||
return false;
|
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
|
||||||
}
|
if(errno == ERANGE || result < 0){
|
||||||
if(result >= (double)UINT64_MAX){
|
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
|
||||||
kvVal->u = UINT64_MAX;
|
return false;
|
||||||
}else{
|
}
|
||||||
kvVal->u = result;
|
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
kvVal->u = tmp;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
|
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
kvVal->u = result;
|
||||||
}else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
|
}else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
|
||||||
if(!IS_VALID_INT(result)){
|
if(!IS_VALID_INT(result)){
|
||||||
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
|
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
|
||||||
|
@ -1103,8 +1113,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
|
||||||
kv->keyLen = VALUE_LEN;
|
kv->keyLen = VALUE_LEN;
|
||||||
kv->value = value;
|
kv->value = value;
|
||||||
kv->length = valueLen;
|
kv->length = valueLen;
|
||||||
if(!smlParseValue(kv, &info->msgBuf) || kv->type == TSDB_DATA_TYPE_BINARY
|
if(!smlParseValue(kv, &info->msgBuf)){
|
||||||
|| kv->type == TSDB_DATA_TYPE_NCHAR || kv->type == TSDB_DATA_TYPE_BOOL){
|
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1124,8 +1133,8 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *c
|
||||||
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
|
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
kv->key = TAG;
|
kv->key = TAG;
|
||||||
kv->keyLen = TAG_LEN;
|
kv->keyLen = TAG_LEN;
|
||||||
kv->value = TAG;
|
kv->value = TAG_VALUE;
|
||||||
kv->length = TAG_LEN;
|
kv->length = TAG_VALUE_LEN;
|
||||||
kv->type = TSDB_DATA_TYPE_NCHAR;
|
kv->type = TSDB_DATA_TYPE_NCHAR;
|
||||||
if(cols) taosArrayPush(cols, &kv);
|
if(cols) taosArrayPush(cols, &kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2264,6 +2273,7 @@ static int32_t smlParseLine(SSmlHandle *info, char* lines[], int numLines){
|
||||||
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
|
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numLines; ++i) {
|
for (int32_t i = 0; i < numLines; ++i) {
|
||||||
|
|
|
@ -208,6 +208,7 @@ TEST(testCase, smlParseCols_Error_Test) {
|
||||||
memcpy(sql, data[i], len + 1);
|
memcpy(sql, data[i], len + 1);
|
||||||
SArray *cols = taosArrayInit(8, POINTER_BYTES);
|
SArray *cols = taosArrayInit(8, POINTER_BYTES);
|
||||||
int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf);
|
int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf);
|
||||||
|
printf("i:%d\n",i);
|
||||||
ASSERT_NE(ret, TSDB_CODE_SUCCESS);
|
ASSERT_NE(ret, TSDB_CODE_SUCCESS);
|
||||||
taosHashClear(dumplicateKey);
|
taosHashClear(dumplicateKey);
|
||||||
taosMemoryFree(sql);
|
taosMemoryFree(sql);
|
||||||
|
@ -272,11 +273,11 @@ TEST(testCase, smlParseCols_tag_Test) {
|
||||||
|
|
||||||
// nchar
|
// nchar
|
||||||
kv = (SSmlKv *)taosArrayGetP(cols, 0);
|
kv = (SSmlKv *)taosArrayGetP(cols, 0);
|
||||||
ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0);
|
ASSERT_EQ(strncasecmp(kv->key, TAG, TAG_LEN), 0);
|
||||||
ASSERT_EQ(kv->keyLen, strlen(TAG));
|
ASSERT_EQ(kv->keyLen, TAG_LEN);
|
||||||
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
|
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
|
||||||
ASSERT_EQ(kv->length, strlen(TAG));
|
ASSERT_EQ(kv->length, TAG_LEN);
|
||||||
ASSERT_EQ(strncasecmp(kv->value, TAG, strlen(TAG)), 0);
|
ASSERT_EQ(strncasecmp(kv->value, TAG_VALUE, TAG_VALUE_LEN), 0);
|
||||||
taosMemoryFree(kv);
|
taosMemoryFree(kv);
|
||||||
|
|
||||||
taosArrayDestroy(cols);
|
taosArrayDestroy(cols);
|
||||||
|
@ -506,7 +507,7 @@ TEST(testCase, smlProcess_influx_Test) {
|
||||||
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608403000000000",
|
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608403000000000",
|
||||||
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609404000000000",
|
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609404000000000",
|
||||||
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619405000000000",
|
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619405000000000",
|
||||||
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 145160640600000000",
|
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606406000000000",
|
||||||
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606407000000000",
|
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606407000000000",
|
||||||
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609408000000000",
|
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609408000000000",
|
||||||
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629409000000000",
|
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629409000000000",
|
||||||
|
@ -745,7 +746,7 @@ TEST(testCase, smlProcess_json1_Test) {
|
||||||
" }\n"
|
" }\n"
|
||||||
" }\n"
|
" }\n"
|
||||||
"]";
|
"]";
|
||||||
int ret = smlProcess(info, (char **)(&sql), -1);
|
int ret = smlProcess(info, (char **)(&sql), 1);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
// case 1
|
// case 1
|
||||||
|
@ -1220,4 +1221,56 @@ TEST(testCase, sml_TD15662_Test) {
|
||||||
ASSERT_EQ(ts, 1626006833639000000);
|
ASSERT_EQ(ts, 1626006833639000000);
|
||||||
|
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, sml_TD15735_Test) {
|
||||||
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_NE(taos, nullptr);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "use sml_db");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
|
||||||
|
ASSERT_NE(request, nullptr);
|
||||||
|
|
||||||
|
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
|
||||||
|
ASSERT_NE(info, nullptr);
|
||||||
|
|
||||||
|
const char *sql[1] = {
|
||||||
|
"{'metric': 'pekoiw', 'timestamp': {'value': 1626006833639000000, 'type': 'ns'}, 'value': {'value': False, 'type': 'bool'}, 'tags': {'t0': {'value': True, 'type': 'bool'}, 't1': {'value': 127, 'type': 'tinyint'}, 't2': {'value': 32767, 'type': 'smallint'}, 't3': {'value': 2147483647, 'type': 'int'}, 't4': {'value': 9223372036854775807, 'type': 'bigint'}, 't5': {'value': 11.12345027923584, 'type': 'float'}, 't6': {'value': 22.123456789, 'type': 'double'}, 't7': {'value': 'binaryTagValue', 'type': 'binary'}, 't8': {'value': 'ncharTagValue', 'type': 'nchar'}}}",
|
||||||
|
};
|
||||||
|
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
|
||||||
|
ASSERT_NE(ret, 0);
|
||||||
|
|
||||||
|
destroyRequest(request);
|
||||||
|
smlDestroyInfo(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, sml_TD15742_Test) {
|
||||||
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_NE(taos, nullptr);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(taos, "create database if not exists TD15742");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "use TD15742");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
|
||||||
|
ASSERT_NE(request, nullptr);
|
||||||
|
|
||||||
|
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
|
||||||
|
ASSERT_NE(info, nullptr);
|
||||||
|
|
||||||
|
const char *sql[] = {
|
||||||
|
"zgzbix 1626006833641 False id=zgzbix_992_38861 t0=t t1=127i8 t2=32767i16 t3=2147483647i32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"binaryTagValue\" t8=L\"ncharTagValue\"",
|
||||||
|
};
|
||||||
|
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
|
||||||
|
ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
destroyRequest(request);
|
||||||
|
smlDestroyInfo(info);
|
||||||
}
|
}
|
|
@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
|
||||||
|
|
||||||
extern const SVnodeCfg vnodeCfgDefault;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
int vnodeInit(int nthreads);
|
int32_t vnodeInit(int32_t nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||||
|
|
||||||
int32_t vnodeStart(SVnode *pVnode);
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
void vnodeStop(SVnode *pVnode);
|
void vnodeStop(SVnode *pVnode);
|
||||||
|
@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
|
||||||
|
|
||||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||||
void metaReaderClear(SMetaReader *pReader);
|
void metaReaderClear(SMetaReader *pReader);
|
||||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||||
int metaReadNext(SMetaReader *pReader);
|
int32_t metaReadNext(SMetaReader *pReader);
|
||||||
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
|
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
|
||||||
|
|
||||||
#if 1 // refact APIs below (TODO)
|
#if 1 // refact APIs below (TODO)
|
||||||
|
@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
|
||||||
|
|
||||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||||
int metaTbCursorNext(SMTbCursor *pTbCur);
|
int32_t metaTbCursorNext(SMTbCursor *pTbCur);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
|
@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle;
|
||||||
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
|
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
|
||||||
|
|
||||||
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
|
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
|
||||||
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||||
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||||
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
|
||||||
|
@ -207,15 +207,15 @@ struct SMetaReader {
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
SMetaEntry me;
|
SMetaEntry me;
|
||||||
void *pBuf;
|
void *pBuf;
|
||||||
int szBuf;
|
int32_t szBuf;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SMTbCursor {
|
struct SMTbCursor {
|
||||||
TBC *pDbc;
|
TBC *pDbc;
|
||||||
void *pKey;
|
void *pKey;
|
||||||
void *pVal;
|
void *pVal;
|
||||||
int kLen;
|
int32_t kLen;
|
||||||
int vLen;
|
int32_t vLen;
|
||||||
SMetaReader mr;
|
SMetaReader mr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// vnodeDebug ====================
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||||
|
@ -34,17 +33,17 @@ extern "C" {
|
||||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// vnodeCfg ====================
|
// vnodeCfg.c
|
||||||
extern const SVnodeCfg vnodeCfgDefault;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
int vnodeCheckCfg(const SVnodeCfg*);
|
int32_t vnodeCheckCfg(const SVnodeCfg*);
|
||||||
int vnodeEncodeConfig(const void* pObj, SJson* pJson);
|
int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
|
||||||
int vnodeDecodeConfig(const SJson* pJson, void* pObj);
|
int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
|
||||||
|
|
||||||
// vnodeModule ====================
|
// vnodeModule.c
|
||||||
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
|
||||||
|
|
||||||
// vnodeBufPool ====================
|
// vnodeBufPool.c
|
||||||
typedef struct SVBufPoolNode SVBufPoolNode;
|
typedef struct SVBufPoolNode SVBufPoolNode;
|
||||||
struct SVBufPoolNode {
|
struct SVBufPoolNode {
|
||||||
SVBufPoolNode* prev;
|
SVBufPoolNode* prev;
|
||||||
|
@ -62,37 +61,29 @@ struct SVBufPool {
|
||||||
SVBufPoolNode node;
|
SVBufPoolNode node;
|
||||||
};
|
};
|
||||||
|
|
||||||
int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
|
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
|
||||||
int vnodeCloseBufPool(SVnode* pVnode);
|
int32_t vnodeCloseBufPool(SVnode* pVnode);
|
||||||
void vnodeBufPoolReset(SVBufPool* pPool);
|
void vnodeBufPoolReset(SVBufPool* pPool);
|
||||||
|
|
||||||
// vnodeQuery ====================
|
// vnodeQuery.c
|
||||||
int vnodeQueryOpen(SVnode* pVnode);
|
int32_t vnodeQueryOpen(SVnode* pVnode);
|
||||||
void vnodeQueryClose(SVnode* pVnode);
|
void vnodeQueryClose(SVnode* pVnode);
|
||||||
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
||||||
|
|
||||||
// vnodeCommit ====================
|
// vnodeCommit.c
|
||||||
int vnodeBegin(SVnode* pVnode);
|
int32_t vnodeBegin(SVnode* pVnode);
|
||||||
int vnodeShouldCommit(SVnode* pVnode);
|
int32_t vnodeShouldCommit(SVnode* pVnode);
|
||||||
int vnodeCommit(SVnode* pVnode);
|
int32_t vnodeCommit(SVnode* pVnode);
|
||||||
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
||||||
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
|
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
|
||||||
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
||||||
int vnodeSyncCommit(SVnode* pVnode);
|
int32_t vnodeSyncCommit(SVnode* pVnode);
|
||||||
int vnodeAsyncCommit(SVnode* pVnode);
|
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
||||||
|
|
||||||
// vnodeCommit ====================
|
// vnodeSync.c
|
||||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||||
int32_t vnodeSyncStart(SVnode* pVnode);
|
void vnodeSyncStart(SVnode* pVnode);
|
||||||
void vnodeSyncClose(SVnode* pVnode);
|
void vnodeSyncClose(SVnode* pVnode);
|
||||||
void vnodeSyncSetMsgCb(SVnode* pVnode);
|
|
||||||
int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
|
||||||
int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
|
||||||
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
|
||||||
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
|
||||||
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
|
||||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
|
||||||
SSyncFSM* syncVnodeMakeFsm();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
||||||
rowLen += pCond->colList[i].bytes;
|
rowLen += pCond->colList[i].bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure the output SSDataBlock size be less than 2MB.
|
||||||
|
int32_t TWOMB = 2 * 1024 * 1024;
|
||||||
|
if (pReadHandle->outputCapacity * rowLen > TWOMB) {
|
||||||
|
pReadHandle->outputCapacity = TWOMB / rowLen;
|
||||||
|
}
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||||
if (pReadHandle->suppInfo.pstatis == NULL) {
|
if (pReadHandle->suppInfo.pstatis == NULL) {
|
||||||
|
@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
|
|
||||||
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
||||||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
||||||
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
|
|
||||||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
|
bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
|
||||||
|
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
|
||||||
|
if (cacheDataInFileBlockHole) {
|
||||||
// do not load file block into buffer
|
// do not load file block into buffer
|
||||||
int32_t step = ascScan ? 1 : -1;
|
int32_t step = ascScan ? 1 : -1;
|
||||||
|
|
||||||
TSKEY maxKey =
|
TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
|
||||||
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
|
|
||||||
cur->rows =
|
cur->rows =
|
||||||
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
|
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
|
||||||
pTsdbReadHandle->realNumOfRows = cur->rows;
|
pTsdbReadHandle->realNumOfRows = cur->rows;
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
pCheckInfo->lastKey = cur->win.ekey + step;
|
pCheckInfo->lastKey = cur->win.ekey + step;
|
||||||
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
|
||||||
|
if (!ascScan) {
|
||||||
TSWAP(cur->win.skey, cur->win.ekey);
|
TSWAP(cur->win.skey, cur->win.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
/*
|
/*
|
||||||
* no data in cache, only load data from file
|
* no data in cache, only load data from file
|
||||||
* during the query processing, data in cache will not be checked anymore.
|
* during the query processing, data in cache will not be checked anymore.
|
||||||
*
|
|
||||||
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
||||||
*/
|
*/
|
||||||
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
|
|
||||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
|
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
|
||||||
|
|
||||||
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) ||
|
bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
|
||||||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
|
if (wholeBlockReturned) {
|
||||||
pTsdbReadHandle->realNumOfRows = binfo.rows;
|
pTsdbReadHandle->realNumOfRows = binfo.rows;
|
||||||
|
|
||||||
cur->rows = binfo.rows;
|
cur->rows = binfo.rows;
|
||||||
cur->win = binfo.window;
|
cur->win = binfo.window;
|
||||||
cur->mixBlock = false;
|
cur->mixBlock = false;
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = true;
|
||||||
|
|
||||||
|
@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
cur->lastKey = binfo.window.skey - 1;
|
cur->lastKey = binfo.window.skey - 1;
|
||||||
cur->pos = -1;
|
cur->pos = -1;
|
||||||
}
|
}
|
||||||
} else { // partially copy to dest buffer
|
} else { // partially copy to dest buffer
|
||||||
|
// make sure to only load once
|
||||||
|
bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan)));
|
||||||
|
if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
|
||||||
|
code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
|
copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(cur->blockCompleted);
|
if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
|
||||||
|
ASSERT(cur->blockCompleted);
|
||||||
|
}
|
||||||
|
|
||||||
if (cur->rows == binfo.rows) {
|
if (cur->rows == binfo.rows) {
|
||||||
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
|
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
|
||||||
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
|
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
|
||||||
|
@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
|
||||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
|
||||||
|
|
||||||
int32_t pos = cur->pos;
|
int32_t step = ascScan? 1 : -1;
|
||||||
|
|
||||||
int32_t start = cur->pos;
|
int32_t start = cur->pos;
|
||||||
int32_t end = endPos;
|
int32_t end = endPos;
|
||||||
|
|
||||||
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
if (!ascScan) {
|
||||||
TSWAP(start, end);
|
TSWAP(start, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
|
||||||
// the time window should always be ascending order: skey <= ekey
|
// the time window should always be ascending order: skey <= ekey
|
||||||
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
|
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
|
||||||
cur->mixBlock = (numOfRows != pBlockInfo->rows);
|
cur->mixBlock = (numOfRows != pBlockInfo->rows);
|
||||||
cur->lastKey = tsArray[endPos] + step;
|
cur->lastKey = tsArray[endPos] + step;
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0));
|
||||||
|
|
||||||
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
|
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
|
||||||
pos = endPos + step;
|
int32_t pos = endPos + step;
|
||||||
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
||||||
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
||||||
|
|
||||||
|
@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
|
||||||
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
|
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
|
||||||
// NOTE: reverse the order to find the end position in data block
|
// NOTE: reverse the order to find the end position in data block
|
||||||
int32_t endPos = -1;
|
int32_t endPos = -1;
|
||||||
int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||||
|
int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||||
|
|
||||||
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
|
if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
|
||||||
endPos = pBlockInfo->rows - 1;
|
if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
|
||||||
cur->mixBlock = (cur->pos != 0);
|
endPos = pBlockInfo->rows - 1;
|
||||||
} else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
|
cur->mixBlock = (cur->pos != 0);
|
||||||
endPos = 0;
|
} else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
|
||||||
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
|
endPos = 0;
|
||||||
|
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
|
||||||
|
} else {
|
||||||
|
assert(pCols->numOfRows > 0);
|
||||||
|
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
|
||||||
|
cur->mixBlock = true;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(pCols->numOfRows > 0);
|
if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
|
||||||
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
|
endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
|
||||||
|
} else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
|
||||||
|
endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
|
||||||
|
} else {
|
||||||
|
ASSERT(pCols->numOfRows > 0);
|
||||||
|
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
|
||||||
|
|
||||||
|
// current data is more than the capacity
|
||||||
|
int32_t size = abs(cur->pos - endPos) + 1;
|
||||||
|
if (size > pTsdbReadHandle->outputCapacity) {
|
||||||
|
int32_t delta = size - pTsdbReadHandle->outputCapacity;
|
||||||
|
if (ascScan) {
|
||||||
|
endPos -= delta;
|
||||||
|
} else {
|
||||||
|
endPos += delta;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
|
||||||
|
|
||||||
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
|
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
|
||||||
|
|
||||||
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
|
static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
||||||
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||||
|
|
||||||
|
@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
|
||||||
cur->fid = pTsdbReadHandle->pFileGroup->fid;
|
cur->fid = pTsdbReadHandle->pFileGroup->fid;
|
||||||
|
|
||||||
STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
|
||||||
return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
|
return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
|
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
|
||||||
|
@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
|
||||||
} else {
|
} else {
|
||||||
moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
|
moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
|
||||||
STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
|
||||||
return getDataBlockRv(pTsdbReadHandle, pNext, exists);
|
return getDataBlock(pTsdbReadHandle, pNext, exists);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
|
|
||||||
// start the sync timer after the queue is ready
|
// start the sync timer after the queue is ready
|
||||||
int32_t vnodeStart(SVnode *pVnode) {
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
vnodeSyncSetMsgCb(pVnode);
|
|
||||||
vnodeSyncStart(pVnode);
|
vnodeSyncStart(pVnode);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,71 +13,62 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
|
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
|
||||||
|
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||||
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
|
||||||
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo = {
|
||||||
syncInfo.vgId = pVnode->config.vgId;
|
.vgId = pVnode->config.vgId,
|
||||||
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
.syncCfg = pVnode->config.syncCfg,
|
||||||
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
|
.pWal = pVnode->pWal,
|
||||||
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
|
.msgcb = NULL,
|
||||||
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
|
.FpSendMsg = vnodeSyncSendMsg,
|
||||||
|
.FpEqMsg = vnodeSyncEqMsg,
|
||||||
|
};
|
||||||
|
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
|
||||||
syncInfo.pWal = pVnode->pWal;
|
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
|
||||||
|
|
||||||
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
|
||||||
syncInfo.msgcb = NULL;
|
|
||||||
syncInfo.FpSendMsg = vnodeSyncSendMsg;
|
|
||||||
syncInfo.FpEqMsg = vnodeSyncEqMsg;
|
|
||||||
|
|
||||||
pVnode->sync = syncOpen(&syncInfo);
|
pVnode->sync = syncOpen(&syncInfo);
|
||||||
assert(pVnode->sync > 0);
|
if (pVnode->sync <= 0) {
|
||||||
|
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// for test
|
|
||||||
setPingTimerMS(pVnode->sync, 3000);
|
setPingTimerMS(pVnode->sync, 3000);
|
||||||
setElectTimerMS(pVnode->sync, 500);
|
setElectTimerMS(pVnode->sync, 500);
|
||||||
setHeartbeatTimerMS(pVnode->sync, 100);
|
setHeartbeatTimerMS(pVnode->sync, 100);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode) {
|
void vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||||
syncStart(pVnode->sync);
|
syncStart(pVnode->sync);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) {
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
// stop by ref id
|
|
||||||
syncStop(pVnode->sync);
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
|
|
||||||
|
|
||||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||||
|
|
||||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
pMsg->info.noResp = 1;
|
|
||||||
return tmsgSendReq(pEpSet, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
|
||||||
SVnode *pVnode = (SVnode *)(pFsm->data);
|
|
||||||
vnodeGetSnapshot(pVnode, pSnapshot);
|
|
||||||
|
|
||||||
/*
|
|
||||||
pSnapshot->data = NULL;
|
|
||||||
pSnapshot->lastApplyIndex = 0;
|
|
||||||
pSnapshot->lastApplyTerm = 0;
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
|
vnodeGetSnapshot(pFsm->data, pSnapshot);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshot != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||||
|
@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
|
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
pFsm->FpCommitCb = vnodeSyncCommitCb;
|
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackCb;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
|
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||||
return pFsm;
|
return pFsm;
|
||||||
}
|
}
|
|
@ -3546,11 +3546,12 @@ _error:
|
||||||
|
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
|
||||||
// todo add more information about exchange operation
|
// todo add more information about exchange operation
|
||||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
|
int32_t type = pOperator->operatorType;
|
||||||
|
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
*order = TSDB_ORDER_ASC;
|
*order = TSDB_ORDER_ASC;
|
||||||
*scanFlag = MAIN_SCAN;
|
*scanFlag = MAIN_SCAN;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
*order = pTableScanInfo->cond.order;
|
*order = pTableScanInfo->cond.order;
|
||||||
*scanFlag = pTableScanInfo->scanFlag;
|
*scanFlag = pTableScanInfo->scanFlag;
|
||||||
|
@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
|
||||||
|
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
|
||||||
|
int32_t TWOMB = 2 * 1024 * 1024;
|
||||||
|
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||||
|
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||||
|
}
|
||||||
initResultSizeInfo(pOperator, numOfRows);
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
|
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
|
||||||
|
|
||||||
pOperator->name = "ProjectOperator";
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
||||||
|
pOperator->name = "ProjectOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfExprs = num;
|
pOperator->numOfExprs = num;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||||
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -20,135 +20,41 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include "cJSON.h"
|
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "syncTools.h"
|
#include "syncTools.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define sFatal(...) \
|
// clang-format off
|
||||||
{ \
|
#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
if (sDebugFlag & DEBUG_FATAL) { \
|
#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||||
taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
|
#define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||||
} \
|
#define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||||
}
|
#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define sError(...) \
|
#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
{ \
|
#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
if (sDebugFlag & DEBUG_ERROR) { \
|
#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||||
taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
|
#define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||||
} \
|
#define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||||
}
|
#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define sWarn(...) \
|
#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
{ \
|
// clang-format on
|
||||||
if (sDebugFlag & DEBUG_WARN) { \
|
|
||||||
taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sInfo(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_INFO) { \
|
|
||||||
taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sDebug(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_DEBUG) { \
|
|
||||||
taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sTrace(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_TRACE) { \
|
|
||||||
taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
|
|
||||||
#define sFatalLong(...) \
|
typedef struct SyncTimeout SyncTimeout;
|
||||||
{ \
|
typedef struct SyncClientRequest SyncClientRequest;
|
||||||
if (sDebugFlag & DEBUG_FATAL) { \
|
typedef struct SyncPing SyncPing;
|
||||||
taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
|
typedef struct SyncPingReply SyncPingReply;
|
||||||
} \
|
typedef struct SyncRequestVote SyncRequestVote;
|
||||||
}
|
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
|
||||||
#define sErrorLong(...) \
|
typedef struct SyncAppendEntries SyncAppendEntries;
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_ERROR) { \
|
|
||||||
taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sWarnLong(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_WARN) { \
|
|
||||||
taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sInfoLong(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_INFO) { \
|
|
||||||
taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sDebugLong(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_DEBUG) { \
|
|
||||||
taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
#define sTraceLong(...) \
|
|
||||||
{ \
|
|
||||||
if (sDebugFlag & DEBUG_TRACE) { \
|
|
||||||
taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SyncTimeout;
|
|
||||||
typedef struct SyncTimeout SyncTimeout;
|
|
||||||
|
|
||||||
struct SyncClientRequest;
|
|
||||||
typedef struct SyncClientRequest SyncClientRequest;
|
|
||||||
|
|
||||||
struct SyncPing;
|
|
||||||
typedef struct SyncPing SyncPing;
|
|
||||||
|
|
||||||
struct SyncPingReply;
|
|
||||||
typedef struct SyncPingReply SyncPingReply;
|
|
||||||
|
|
||||||
struct SyncRequestVote;
|
|
||||||
typedef struct SyncRequestVote SyncRequestVote;
|
|
||||||
|
|
||||||
struct SyncRequestVoteReply;
|
|
||||||
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
|
|
||||||
|
|
||||||
struct SyncAppendEntries;
|
|
||||||
typedef struct SyncAppendEntries SyncAppendEntries;
|
|
||||||
|
|
||||||
struct SyncAppendEntriesReply;
|
|
||||||
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
|
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
|
||||||
|
typedef struct SSyncEnv SSyncEnv;
|
||||||
struct SSyncEnv;
|
typedef struct SRaftStore SRaftStore;
|
||||||
typedef struct SSyncEnv SSyncEnv;
|
typedef struct SVotesGranted SVotesGranted;
|
||||||
|
typedef struct SVotesRespond SVotesRespond;
|
||||||
struct SRaftStore;
|
typedef struct SSyncIndexMgr SSyncIndexMgr;
|
||||||
typedef struct SRaftStore SRaftStore;
|
typedef struct SRaftCfg SRaftCfg;
|
||||||
|
typedef struct SSyncRespMgr SSyncRespMgr;
|
||||||
struct SVotesGranted;
|
|
||||||
typedef struct SVotesGranted SVotesGranted;
|
|
||||||
|
|
||||||
struct SVotesRespond;
|
|
||||||
typedef struct SVotesRespond SVotesRespond;
|
|
||||||
|
|
||||||
struct SSyncIndexMgr;
|
|
||||||
typedef struct SSyncIndexMgr SSyncIndexMgr;
|
|
||||||
|
|
||||||
struct SRaftCfg;
|
|
||||||
typedef struct SRaftCfg SRaftCfg;
|
|
||||||
|
|
||||||
struct SSyncRespMgr;
|
|
||||||
typedef struct SSyncRespMgr SSyncRespMgr;
|
|
||||||
|
|
||||||
typedef struct SSyncNode {
|
typedef struct SSyncNode {
|
||||||
// init by SSyncInfo
|
// init by SSyncInfo
|
||||||
|
|
|
@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilraftId2EpSet(destRaftId, &epSet);
|
syncUtilraftId2EpSet(destRaftId, &epSet);
|
||||||
if (pSyncNode->FpSendMsg != NULL) {
|
if (pSyncNode->FpSendMsg != NULL) {
|
||||||
pMsg->info.noResp = 1;
|
|
||||||
// htonl
|
// htonl
|
||||||
syncUtilMsgHtoN(pMsg->pCont);
|
syncUtilMsgHtoN(pMsg->pCont);
|
||||||
|
|
||||||
|
pMsg->info.noResp = 1;
|
||||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
|
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
|
||||||
|
@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
|
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
|
||||||
if (pSyncNode->FpSendMsg != NULL) {
|
if (pSyncNode->FpSendMsg != NULL) {
|
||||||
pMsg->info.noResp = 1;
|
|
||||||
// htonl
|
// htonl
|
||||||
syncUtilMsgHtoN(pMsg->pCont);
|
syncUtilMsgHtoN(pMsg->pCont);
|
||||||
|
|
||||||
|
pMsg->info.noResp = 1;
|
||||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
|
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
|
||||||
|
|
|
@ -97,11 +97,12 @@ int main(int argc, char** argv) {
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
SyncPingReply* pSyncMsg =
|
SyncPingReply* pSyncMsg =
|
||||||
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
|
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg = {0};
|
||||||
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
|
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
|
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
|
||||||
|
rpcMsg.info.noResp = 1;
|
||||||
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
|
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
|
||||||
|
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
#run user/pass_alter.sim
|
||||||
|
#run user/basic1.sim
|
||||||
|
#run user/privilege2.sim
|
||||||
|
#run user/user_len.sim
|
||||||
|
#run user/privilege1.sim
|
||||||
|
#run user/pass_len.sim
|
||||||
|
#run tstream/basic1.sim
|
||||||
|
#run tstream/basic0.sim
|
||||||
|
#run table/basic1.sim
|
||||||
|
#run trans/create_db.sim
|
||||||
|
#run stable/alter1.sim
|
||||||
|
#run stable/vnode3.sim
|
||||||
|
#run stable/metrics.sim
|
||||||
|
#run stable/show.sim
|
||||||
|
#run stable/values.sim
|
||||||
|
#run stable/dnode3.sim
|
||||||
|
#run stable/refcount.sim
|
||||||
|
#run stable/disk.sim
|
||||||
|
#run db/basic1.sim
|
||||||
|
#run db/basic3.sim
|
||||||
|
#run db/basic7.sim
|
||||||
|
#run db/basic6.sim
|
||||||
|
#run db/create_all_options.sim
|
||||||
|
#run db/basic2.sim
|
||||||
|
#run db/error1.sim
|
||||||
|
#run db/taosdlog.sim
|
||||||
|
#run db/alter_option.sim
|
||||||
|
#run mnode/basic1.sim
|
||||||
|
#run parser/fourArithmetic-basic.sim
|
||||||
|
#run parser/groupby-basic.sim
|
||||||
|
#run snode/basic1.sim
|
||||||
|
#run query/time_process.sim
|
||||||
|
#run query/stddev.sim
|
||||||
|
#run query/interval-offset.sim
|
||||||
|
#run query/charScalarFunction.sim
|
||||||
|
#run query/complex_select.sim
|
||||||
|
#run query/explain.sim
|
||||||
|
#run query/crash_sql.sim
|
||||||
|
#run query/diff.sim
|
||||||
|
#run query/complex_limit.sim
|
||||||
|
#run query/complex_having.sim
|
||||||
|
#run query/udf.sim
|
||||||
|
#run query/complex_group.sim
|
||||||
|
#run query/interval.sim
|
||||||
|
#run query/session.sim
|
||||||
|
|
||||||
|
print ========> dead lock failed when 2 rows in outputCapacity
|
||||||
|
run query/scalarFunction.sim
|
||||||
|
run query/scalarNull.sim
|
||||||
|
run query/complex_where.sim
|
||||||
|
run tmq/basic1.sim
|
||||||
|
run tmq/basic4.sim
|
||||||
|
run tmq/basic1Of2Cons.sim
|
||||||
|
run tmq/prepareBasicEnv-1vgrp.sim
|
||||||
|
run tmq/topic.sim
|
||||||
|
run tmq/basic4Of2Cons.sim
|
||||||
|
run tmq/prepareBasicEnv-4vgrp.sim
|
||||||
|
run tmq/basic3.sim
|
||||||
|
run tmq/basic2Of2Cons.sim
|
||||||
|
run tmq/basic2.sim
|
||||||
|
run tmq/basic3Of2Cons.sim
|
||||||
|
run tmq/basic2Of2ConsOverlap.sim
|
||||||
|
run tmq/clearConsume.sim
|
||||||
|
run qnode/basic1.sim
|
||||||
|
run dnode/basic1.sim
|
||||||
|
run show/basic.sim
|
||||||
|
run insert/basic1.sim
|
||||||
|
run insert/basic0.sim
|
||||||
|
run insert/backquote.sim
|
||||||
|
run insert/null.sim
|
||||||
|
run sync/oneReplica1VgElectWithInsert.sim
|
||||||
|
run sync/threeReplica1VgElect.sim
|
||||||
|
run sync/oneReplica1VgElect.sim
|
||||||
|
run sync/insertDataByRunBack.sim
|
||||||
|
run sync/threeReplica1VgElectWihtInsert.sim
|
||||||
|
run sma/tsmaCreateInsertData.sim
|
||||||
|
run sma/rsmaCreateInsertQuery.sim
|
||||||
|
run valgrind/checkError.sim
|
||||||
|
run bnode/basic1.sim
|
Loading…
Reference in New Issue