Merge branch '3.0' into feature/qnode

This commit is contained in:
dapan1121 2022-06-11 17:58:53 +08:00 committed by GitHub
commit 0046593d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1439 additions and 441 deletions

View File

@ -236,6 +236,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX

View File

@ -121,7 +121,7 @@ typedef enum EFunctionType {
// internal function
FUNCTION_TYPE_SELECT_VALUE,
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL,
@ -170,6 +170,7 @@ bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);

View File

@ -47,7 +47,7 @@ typedef struct SDatabaseOptions {
int32_t maxRowsPerBlock;
int32_t minRowsPerBlock;
SNodeList* pKeep;
int32_t keep[3];
int64_t keep[3];
int32_t pages;
int32_t pagesize;
char precisionStr[3];

View File

@ -48,6 +48,7 @@ typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
TAOS_SYNC_ONLY_ONE_REPLICA = 3,
} ESyncProposeCode;
typedef enum {
@ -200,6 +201,9 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
// to be moved to static
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);

View File

@ -398,6 +398,8 @@ typedef struct SyncSnapshotSend {
SyncTerm term;
SyncIndex lastIndex; // lastIndex of snapshot
SyncTerm lastTerm; // lastTerm of snapshot
SyncIndex lastConfigIndex;
SSyncCfg lastConfig;
SyncTerm privateTerm;
int32_t seq;
uint32_t dataLen;
@ -456,6 +458,36 @@ void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
// ---------------------------------------------
typedef struct SyncLeaderTransfer {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
/*
SRaftId srcId;
SRaftId destId;
*/
SRaftId newLeaderId;
} SyncLeaderTransfer;
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg);
void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen);
void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg);
char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len);
SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len);
void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg);
void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg);
SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
// for debug ----------------------
void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg);
void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
// on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);

View File

@ -652,6 +652,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2654)
#define TSDB_CODE_PAR_INVALID_DELETE_WHERE TAOS_DEF_ERROR_CODE(0, 0x2655)
#define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656)
#define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)

View File

@ -72,7 +72,7 @@ for (int i = 1; i < keyLen; ++i) { \
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5
#define LINE_BATCH 20
#define LINE_BATCH 20000
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
@ -161,7 +161,6 @@ typedef struct {
typedef struct{
SRequestObj* request;
SCatalog* catalog;
tsem_t sem;
TdThreadSpinlock lock;
} Params;
@ -1297,9 +1296,46 @@ static void smlDestroyTableInfo(SSmlHandle* info, SSmlTableInfo *tag){
taosMemoryFree(tag);
}
static int32_t smlKvTimeArrayCompare(const void* key1, const void* key2) {
SArray *s1 = *(SArray **)key1;
SArray *s2 = *(SArray **)key2;
SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlKvTimeHashCompare(const void* key1, const void* key2) {
SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv *kv1 = (SSmlKv *)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = (SSmlKv *)taosHashGet(s2, TS, TS_LEN);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){
taosArrayPush(oneTable->cols, &cols);
void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE);
if(p == NULL){
taosArrayPush(oneTable->cols, &cols);
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}
return TSDB_CODE_SUCCESS;
}
@ -1312,8 +1348,13 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
taosArrayPush(oneTable->cols, &kvHash);
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE);
if(p == NULL){
taosArrayPush(oneTable->cols, &kvHash);
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
}
return TSDB_CODE_SUCCESS;
}
@ -1424,6 +1465,11 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->precision = precision;
info->protocol = protocol;
@ -2207,6 +2253,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
(*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlBindData failed", info->id);
return code;
}
oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable);
@ -2272,7 +2319,7 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
code = smlParseLine(info, lines, numLines);
if (code != 0) {
uError("SML:0x%"PRIx64" smlParseLine error : %s", info->id, tstrerror(code));
goto cleanup;
return code;
}
info->cost.lineNum = numLines;
@ -2288,24 +2335,27 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
goto cleanup;
return code;
}
info->cost.insertBindTime = taosGetTimestampUs();
code = smlInsertData(info);
if (code != 0) {
uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
goto cleanup;
return code;
}
info->cost.endTime = taosGetTimestampUs();
cleanup:
info->cost.code = code;
smlPrintStatisticInfo(info);
return code;
}
static int32_t isSchemalessDb(STscObj *taos, SRequestObj* request, SCatalog *catalog){
static int32_t isSchemalessDb(STscObj *taos, SRequestObj* request){
SCatalog* catalog = NULL;
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML get catalog error %d", code);
return code;
}
SName name;
tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
@ -2318,7 +2368,7 @@ static int32_t isSchemalessDb(STscObj *taos, SRequestObj* request, SCatalog *cat
conn.requestObjRefId = request->self;
conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
int32_t code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2345,6 +2395,9 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
printf("SML:0x%" PRIx64 " insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows);
Params *pParam = info->params;
bool isLast = info->isLast;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info);
if(isLast){
@ -2389,20 +2442,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0);
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &params.catalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML get catalog error %d", code);
request->code = code;
goto end;
}
if(request->pDb == NULL){
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end;
}
if(isSchemalessDb(((STscObj *)taos), request, params.catalog) != TSDB_CODE_SUCCESS){
if(isSchemalessDb(((STscObj *)taos), request) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end;
@ -2452,11 +2498,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
info->params = &params;
info->pCatalog = params.catalog;
info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info;
code = smlProcess(info, lines, perBatch);
int32_t code = smlProcess(info, lines, perBatch);
lines += perBatch;
if (code != TSDB_CODE_SUCCESS){
info->pRequest->body.queryFp(info, req, code);

View File

@ -476,22 +476,39 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(sql);
}
TEST(testCase, smlGetTimestampLen_Test) {
uint8_t len = smlGetTimestampLen(0);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(390);
ASSERT_EQ(len, 3);
len = smlGetTimestampLen(-1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
}
TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists inflx_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists inflx_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use inflx_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606401000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607402000000000",
@ -505,19 +522,20 @@ TEST(testCase, smlProcess_influx_Test) {
"stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=\"kk\",c4=4 1451629501000000000",
"stable,t2=t2,t1=t1,t3=t3 c1=1,c3=\"\",c4=4 1451629602000000000",
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_91e0b182be80332b5c530cbf872f760e");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
pRes = taos_query(taos, "select * from t_91e0b182be80332b5c530cbf872f760e");
ASSERT_NE(pRes, nullptr);
int fieldNum = taos_field_count(pRes);
ASSERT_EQ(fieldNum, 11);
printf("fieldNum:%d\n", fieldNum);
TAOS_ROW row = NULL;
int32_t rowIndex = 0;
while((row = taos_fetch_row(res)) != NULL) {
while((row = taos_fetch_row(pRes)) != NULL) {
int64_t ts = *(int64_t*)row[0];
double load_capacity = *(double*)row[1];
double fuel_capacity = *(double*)row[2];
@ -546,18 +564,18 @@ TEST(testCase, smlProcess_influx_Test) {
}
rowIndex++;
}
taos_free_result(res);
taos_free_result(pRes);
// case 2
res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
pRes = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(pRes, nullptr);
fieldNum = taos_field_count(pRes);
ASSERT_EQ(fieldNum, 5);
printf("fieldNum:%d\n", fieldNum);
rowIndex = 0;
while((row = taos_fetch_row(res)) != NULL) {
int *length = taos_fetch_lengths(res);
while((row = taos_fetch_row(pRes)) != NULL) {
int *length = taos_fetch_lengths(pRes);
int64_t ts = *(int64_t*)row[0];
double c1 = *(double*)row[1];
@ -580,20 +598,16 @@ TEST(testCase, smlProcess_influx_Test) {
}
rowIndex++;
}
taos_free_result(res);
taos_free_result(pRes);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
pRes = taos_query(taos, "show tables");
ASSERT_NE(pRes, nullptr);
row = taos_fetch_row(res);
int rowNum = taos_affected_rows(res);
row = taos_fetch_row(pRes);
int rowNum = taos_affected_rows(pRes);
ASSERT_EQ(rowNum, 5);
taos_free_result(res);
destroyRequest(request);
smlDestroyInfo(info);
taos_free_result(pRes);
}
// different types
@ -601,122 +615,79 @@ TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"measure,t1=3 c1=8",
"measure,t2=3 c1=8u8"
};
int ret = smlProcess(info, (char **)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, smlGetTimestampLen_Test) {
uint8_t len = smlGetTimestampLen(0);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(390);
ASSERT_EQ(len, 3);
len = smlGetTimestampLen(-1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, smlProcess_telnet_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists telnet_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists telnet_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use telnet_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0",
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ",
"sys.if.bytes.out 1479496102 1.3E3 network=tcp",
" sys.procs.running 1479496100 42 host=web01 "
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
pRes = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
ASSERT_NE(pRes, nullptr);
int fieldNum = taos_field_count(pRes);
ASSERT_EQ(fieldNum, 2);
TAOS_ROW row = taos_fetch_row(res);
TAOS_ROW row = taos_fetch_row(pRes);
int64_t ts = *(int64_t*)row[0];
double c1 = *(double*)row[1];
ASSERT_EQ(ts, 1479496100000);
ASSERT_EQ(c1, 42);
int rowNum = taos_affected_rows(res);
int rowNum = taos_affected_rows(pRes);
ASSERT_EQ(rowNum, 1);
taos_free_result(res);
taos_free_result(pRes);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
pRes = taos_query(taos, "show tables");
ASSERT_NE(pRes, nullptr);
row = taos_fetch_row(res);
rowNum = taos_affected_rows(res);
row = taos_fetch_row(pRes);
rowNum = taos_affected_rows(pRes);
ASSERT_EQ(rowNum, 3);
taos_free_result(res);
destroyRequest(request);
smlDestroyInfo(info);
taos_free_result(pRes);
}
TEST(testCase, smlProcess_json1_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists json_db");
TAOS_RES *pRes = taos_query(taos, "create database if not exists json_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use json_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql =
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
@ -724,6 +695,7 @@ TEST(testCase, smlProcess_json1_Test) {
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"id\": \"t1\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
@ -736,55 +708,48 @@ TEST(testCase, smlProcess_json1_Test) {
" \"dc\": \"lga\"\n"
" }\n"
" }\n"
"]";
int ret = smlProcess(info, (char **)(&sql), 1);
ASSERT_EQ(ret, 0);
"]"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
pRes = taos_query(taos, "select * from t1");
ASSERT_NE(pRes, nullptr);
int fieldNum = taos_field_count(pRes);
ASSERT_EQ(fieldNum, 2);
TAOS_ROW row = taos_fetch_row(res);
TAOS_ROW row = taos_fetch_row(pRes);
int64_t ts = *(int64_t*)row[0];
double c1 = *(double*)row[1];
ASSERT_EQ(ts, 1346846400000);
ASSERT_EQ(c1, 18);
int rowNum = taos_affected_rows(res);
int rowNum = taos_affected_rows(pRes);
ASSERT_EQ(rowNum, 1);
taos_free_result(res);
taos_free_result(pRes);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
pRes = taos_query(taos, "show tables");
ASSERT_NE(pRes, nullptr);
row = taos_fetch_row(res);
rowNum = taos_affected_rows(res);
row = taos_fetch_row(pRes);
rowNum = taos_affected_rows(pRes);
ASSERT_EQ(rowNum, 2);
taos_free_result(res);
destroyRequest(request);
smlDestroyInfo(info);
taos_free_result(pRes);
}
TEST(testCase, smlProcess_json2_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql =
const char *sql[] = {
"{\n"
" \"metric\": \"meter_current0\",\n"
" \"timestamp\": {\n"
@ -806,29 +771,23 @@ TEST(testCase, smlProcess_json2_Test) {
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
int32_t ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
"}"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, smlProcess_json3_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql =
const char *sql[] ={
"{\n"
" \"metric\": \"meter_current1\",\n"
" \"timestamp\": {\n"
@ -878,29 +837,23 @@ TEST(testCase, smlProcess_json3_Test) {
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
int32_t ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
"}"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, smlProcess_json4_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql = "{\n"
const char *sql[] = {"{\n"
" \"metric\": \"meter_current2\",\n"
" \"timestamp\": {\n"
" \"value\" : 1346846500000,\n"
@ -940,18 +893,17 @@ TEST(testCase, smlProcess_json4_Test) {
" \"t9\": false,\n"
" \"id\": \"d1001\"\n"
" }\n"
"}";
int32_t ret = smlProcess(info, (char**)(&sql), -1);
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
"}"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, smlParseTelnetLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
@ -1000,34 +952,27 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
"sys.procs.running 1479496104000 42 host=web01",
"sys.procs.running 1479496104000 42u8 host=web01"
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(taos_errno(pRes), 0);
taos_free_result(pRes);
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_json_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
@ -1095,19 +1040,13 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
@ -1129,30 +1068,22 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
" },\n"
"]",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
@ -1174,90 +1105,64 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
" },\n"
"]",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, sml_TD15662_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_15662 precision 'ns'");
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_15662 precision 'ns' schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_15662");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"hetrey c0=f,c1=127i8 1626006833639",
"hetrey,t1=r c0=f,c1=127i8 1626006833640",
};
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, sml_TD15735_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[1] = {
"{'metric': 'pekoiw', 'timestamp': {'value': 1626006833639000000, 'type': 'ns'}, 'value': {'value': False, 'type': 'bool'}, 'tags': {'t0': {'value': True, 'type': 'bool'}, 't1': {'value': 127, 'type': 'tinyint'}, 't2': {'value': 32767, 'type': 'smallint'}, 't3': {'value': 2147483647, 'type': 'int'}, 't4': {'value': 9223372036854775807, 'type': 'bigint'}, 't5': {'value': 11.12345027923584, 'type': 'float'}, 't6': {'value': 22.123456789, 'type': 'double'}, 't7': {'value': 'binaryTagValue', 'type': 'binary'}, 't8': {'value': 'ncharTagValue', 'type': 'nchar'}}}",
};
int32_t ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_NE(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_NE(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, sml_TD15742_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists TD15742");
TAOS_RES* pRes = taos_query(taos, "create database if not exists TD15742 schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use TD15742");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"test_ms,t0=t c0=f 1626006833641",
};
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
destroyRequest(request);
smlDestroyInfo(info);
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
TEST(testCase, sml_params_Test) {
@ -1325,8 +1230,8 @@ TEST(testCase, sml_oom_Test) {
pRes = taos_query(taos, "use oom");
taos_free_result(pRes);
TAOS_RES* res = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(res), 0);
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}

View File

@ -1424,10 +1424,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
char tmp[128] = {0};
int32_t len = 0;
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2,
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2,
pDb->cfg.daysToKeep0);
} else {
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1,
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1,
pDb->cfg.daysToKeep2);
}
@ -1592,4 +1592,3 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -86,11 +86,15 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int nData = 0;
tb_uid_t uid = 0;
metaRLock(pMeta);
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
uid = *(tb_uid_t *)pData;
tdbFree(pData);
}
metaULock(pMeta);
return 0;
}

View File

@ -793,7 +793,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
msgIter.suid = 0;
}
#ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
#endif
tDecoderClear(&decoder);
} else {
submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);

View File

@ -2649,8 +2649,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, int8_t calTrigger,
__get_win_info_ fn) {
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed,
__get_win_info_ fn) {
// Todo(liuyao) save window to tdb
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
@ -2658,19 +2658,9 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
SResultWindowInfo* pSeWin = fn(pWin);
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) {
if (!pSeWin->isClosed) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = 0;
pos->pos = pSeWin->pos;
*(int64_t*)pos->key = pSeWin->win.ekey;
if (!taosArrayPush(pClosed, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSeWin->isClosed = true;
if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true;
}
}
@ -2681,6 +2671,19 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
return TSDB_CODE_SUCCESS;
}
int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
void* pWin = taosArrayGet(pWins, i);
SResultWindowInfo* pSeWin = fn(pWin);
if (!pSeWin->isClosed) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -2703,6 +2706,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@ -2723,7 +2727,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo);
continue;
}
if (isFinalSession(pInfo)) {
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
@ -2735,15 +2744,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
SArray* pClosed = taosArrayInit(16, POINTER_BYTES);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getSessionWinInfo);
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pStUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset);
@ -3067,6 +3071,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@ -3078,6 +3083,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo);
continue;
}
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
@ -3085,15 +3094,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
SArray* pClosed = taosArrayInit(16, POINTER_BYTES);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getStateWinInfo);
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pSeUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset);

View File

@ -41,6 +41,7 @@ extern "C" {
#define FUNC_MGT_SCAN_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(12)
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -294,7 +294,8 @@ static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int
pValue->notReserved = true;
}
pFunc->node.resType = (SDataType){.bytes = getApercentileMaxSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType =
(SDataType){.bytes = getApercentileMaxSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -479,7 +480,8 @@ static int32_t translateElapsedImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
}
}
pFunc->node.resType = (SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType =
(SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -593,7 +595,8 @@ static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType =
(SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -631,7 +634,8 @@ static int32_t translateHLLImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len
}
if (isPartial) {
pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
pFunc->node.resType =
(SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
}
@ -1127,7 +1131,7 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) {
return false;
@ -1320,7 +1324,7 @@ static int32_t translateSelectValue(SFunctionNode* pFunc, char* pErrBuf, int32_t
}
static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pFunc->node.resType = (SDataType) {.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR};
pFunc->node.resType = (SDataType){.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
@ -1329,7 +1333,6 @@ static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv
return true;
}
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
@ -1625,7 +1628,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "histogram",
.type = FUNCTION_TYPE_HISTOGRAM,
.classification = FUNC_MGT_AGG_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_FILL_FUNC,
.translateFunc = translateHistogram,
.getEnvFunc = getHistogramFuncEnv,
.initFunc = histogramFunctionSetup,

View File

@ -159,6 +159,8 @@ bool fmIsRepeatScanFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId,
bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); }
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {

View File

@ -346,25 +346,30 @@ SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLitera
return (SNode*)val;
}
static int32_t addParamToLogicConditionNode(SLogicConditionNode* pCond, SNode* pParam) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam) && pCond->condType == ((SLogicConditionNode*)pParam)->condType) {
int32_t code = nodesListAppendList(pCond->pParameterList, ((SLogicConditionNode*)pParam)->pParameterList);
((SLogicConditionNode*)pParam)->pParameterList = NULL;
nodesDestroyNode(pParam);
return code;
} else {
return nodesListAppend(pCond->pParameterList, pParam);
}
}
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2) {
CHECK_PARSER_STATUS(pCxt);
SLogicConditionNode* cond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
CHECK_OUT_OF_MEM(cond);
cond->condType = type;
cond->pParameterList = nodesMakeList();
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type == ((SLogicConditionNode*)pParam1)->condType) {
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList);
((SLogicConditionNode*)pParam1)->pParameterList = NULL;
nodesDestroyNode(pParam1);
} else {
nodesListAppend(cond->pParameterList, pParam1);
int32_t code = addParamToLogicConditionNode(cond, pParam1);
if (TSDB_CODE_SUCCESS == code && NULL != pParam2) {
code = addParamToLogicConditionNode(cond, pParam2);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type == ((SLogicConditionNode*)pParam2)->condType) {
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList);
((SLogicConditionNode*)pParam2)->pParameterList = NULL;
nodesDestroyNode(pParam2);
} else {
nodesListAppend(cond->pParameterList, pParam2);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(cond);
return NULL;
}
return (SNode*)cond;
}

View File

@ -765,7 +765,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
}
int32_t len = 0;
if (!taosMbsToUcs4(pVal->literal, pVal->node.resType.bytes, (TdUcs4*)varDataVal(pVal->datum.p),
if (!taosMbsToUcs4(pVal->literal, strlen(pVal->literal), (TdUcs4*)varDataVal(pVal->datum.p),
targetDt.bytes - VARSTR_HEADER_SIZE, &len)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
@ -1006,6 +1006,9 @@ static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
}
static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsAggFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (beforeHaving(pCxt->currClause)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);
}
@ -1023,6 +1026,9 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
}
static int32_t translateScanPseudoColumnFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsScanPseudoColumnFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (0 == LIST_LENGTH(pFunc->pParameterList)) {
if (QUERY_NODE_REAL_TABLE != nodeType(pCxt->pCurrSelectStmt->pFromTable)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TBNAME);
@ -1039,6 +1045,9 @@ static int32_t translateScanPseudoColumnFunc(STranslateContext* pCxt, SFunctionN
}
static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsIndefiniteRowsFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (SQL_CLAUSE_SELECT != pCxt->currClause || pCxt->pCurrSelectStmt->hasIndefiniteRowsFunc ||
pCxt->pCurrSelectStmt->hasAggFuncs) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
@ -1049,6 +1058,18 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidFillFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidFillFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt->pWindow &&
QUERY_NODE_INTERVAL_WINDOW == nodeType(pCxt->pCurrSelectStmt->pWindow) &&
NULL != ((SIntervalWindowNode*)pCxt->pCurrSelectStmt->pWindow)->pFill) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static void setFuncClassification(SSelectStmt* pSelect, SFunctionNode* pFunc) {
if (NULL != pSelect) {
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
@ -1066,15 +1087,18 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
}
pCxt->errCode = getFuncInfo(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsAggFunc(pFunc->funcId)) {
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateAggFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsScanPseudoColumnFunc(pFunc->funcId)) {
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateScanPseudoColumnFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsIndefiniteRowsFunc(pFunc->funcId)) {
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateIndefiniteRowsFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidFillFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
setFuncClassification(pCxt->pCurrSelectStmt, pFunc);
}
@ -2397,7 +2421,9 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
}
SNode* pRetention = NULL;
SValueNode* pPrevFreq = NULL;
SValueNode* pPrevKeep = NULL;
SNode* pRetention = NULL;
FOREACH(pRetention, pRetentions) {
SNode* pNode = NULL;
FOREACH(pNode, ((SNodeListNode*)pRetention)->pNodeList) {
@ -2406,6 +2432,16 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
return pCxt->errCode;
}
}
SValueNode* pFreq = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 0);
SValueNode* pKeep = (SValueNode*)nodesListGetNode(((SNodeListNode*)pRetention)->pNodeList, 1);
if (pFreq->datum.i <= 0 || 'n' == pFreq->unit || 'y' == pFreq->unit || pFreq->datum.i >= pKeep->datum.i ||
(NULL != pPrevFreq && pPrevFreq->datum.i >= pFreq->datum.i) ||
(NULL != pPrevKeep && pPrevKeep->datum.i > pKeep->datum.i)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
}
pPrevFreq = pFreq;
pPrevKeep = pKeep;
}
return TSDB_CODE_SUCCESS;
@ -2625,12 +2661,23 @@ static int32_t checkTableSmaOption(STranslateContext* pCxt, SCreateTableStmt* pS
return TSDB_CODE_SUCCESS;
}
static bool validRollupFunc(const char* pFunc) {
static const char* rollupFuncs[] = {"avg", "sum", "min", "max", "last", "first"};
static const int32_t numOfRollupFuncs = (sizeof(rollupFuncs) / sizeof(char*));
for (int i = 0; i < numOfRollupFuncs; ++i) {
if (0 == strcmp(rollupFuncs[i], pFunc)) {
return true;
}
}
return false;
}
static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs) {
if (NULL == pFuncs) {
return TSDB_CODE_SUCCESS;
}
if (1 != LIST_LENGTH(pFuncs)) {
if (1 != LIST_LENGTH(pFuncs) || !validRollupFunc(((SFunctionNode*)nodesListGetNode(pFuncs, 0))->functionName)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
}
return TSDB_CODE_SUCCESS;
@ -3115,15 +3162,14 @@ static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pSt
SName tableName;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name);
alterReq.alterType = pStmt->alterType;
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) {
return TSDB_CODE_FAILED;
} else {
if (TSDB_CODE_SUCCESS != setAlterTableField(pStmt, &alterReq)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
return buildCmdMsg(pCxt, TDMT_MND_ALTER_STB, (FSerializeFunc)tSerializeSMAlterStbReq, &alterReq);
int32_t code = setAlterTableField(pStmt, &alterReq);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_ALTER_STB, (FSerializeFunc)tSerializeSMAlterStbReq, &alterReq);
}
return code;
}
static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) {
@ -3203,7 +3249,7 @@ static int32_t nodeTypeToShowType(ENodeType nt) {
case QUERY_NODE_SHOW_QUERIES_STMT:
return TSDB_MGMT_TABLE_QUERIES;
case QUERY_NODE_SHOW_VARIABLE_STMT:
return 0; // todo
return TSDB_MGMT_TABLE_CONFIGS;
default:
break;
}
@ -3810,6 +3856,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
code = translateShow(pCxt, (SShowStmt*)pNode);
break;
case QUERY_NODE_CREATE_INDEX_STMT:
@ -4964,7 +5011,11 @@ static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
return buildUpdateOptionsReq(pCxt, pStmt, pReq);
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
return buildRenameColReq(pCxt, pStmt, pTableMeta, pReq);
if (TSDB_CHILD_TABLE == pTableMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
} else {
return buildRenameColReq(pCxt, pStmt, pTableMeta, pReq);
}
default:
break;
}

View File

@ -76,7 +76,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_INVALID_KEEP_ORDER:
return "Invalid keep value, should be keep0 <= keep1 <= keep2";
case TSDB_CODE_PAR_INVALID_KEEP_VALUE:
return "Invalid option keep: %d, %d, %d valid range: [%d, %d]";
return "Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]";
case TSDB_CODE_PAR_INVALID_COMMENT_OPTION:
return "Invalid option comment, length cannot exceed %d";
case TSDB_CODE_PAR_INVALID_F_RANGE_OPTION:
@ -182,6 +182,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "The DELETE statement must have a definite time window range";
case TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG:
return "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes";
case TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC:
return "%s function not allowed in fill query";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:

View File

@ -24,7 +24,7 @@ class ParserInitialATest : public ParserDdlTest {};
TEST_F(ParserInitialATest, alterAccount) {
useDb("root", "test");
run("ALTER ACCOUNT ac_wxy PASS '123456'", TSDB_CODE_PAR_EXPRIE_STATEMENT);
run("ALTER ACCOUNT ac_wxy PASS '123456'", TSDB_CODE_PAR_EXPRIE_STATEMENT, PARSER_STAGE_PARSE);
}
TEST_F(ParserInitialATest, alterDnode) {
@ -157,8 +157,8 @@ TEST_F(ParserInitialATest, alterSTable) {
20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE st1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
run("ALTER TABLE st1 RENAME COLUMN c1 cc1");
// setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
// run("ALTER TABLE st1 RENAME COLUMN c1 cc1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD TAG tag11 BIGINT");
@ -177,6 +177,12 @@ TEST_F(ParserInitialATest, alterSTable) {
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
}
TEST_F(ParserInitialATest, alterSTableSemanticCheck) {
useDb("root", "test");
run("ALTER TABLE st1 RENAME COLUMN c1 cc1", TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
TEST_F(ParserInitialATest, alterTable) {
useDb("root", "test");
@ -299,6 +305,12 @@ TEST_F(ParserInitialATest, alterTable) {
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
}
TEST_F(ParserInitialATest, alterTableSemanticCheck) {
useDb("root", "test");
run("ALTER TABLE st1s1 RENAME COLUMN c1 cc1", TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
TEST_F(ParserInitialATest, alterUser) {
useDb("root", "test");
@ -323,7 +335,7 @@ TEST_F(ParserInitialATest, balanceVgroup) {
TEST_F(ParserInitialATest, bug001) {
useDb("root", "test");
run("ALTER DATABASE db WAL 0 # td-14436", TSDB_CODE_PAR_SYNTAX_ERROR);
run("ALTER DATABASE db WAL 0 # td-14436", TSDB_CODE_PAR_SYNTAX_ERROR, PARSER_STAGE_PARSE);
}
} // namespace ParserTest

View File

@ -27,7 +27,7 @@ class ParserInitialCTest : public ParserDdlTest {};
TEST_F(ParserInitialCTest, createAccount) {
useDb("root", "test");
run("CREATE ACCOUNT ac_wxy PASS '123456'", TSDB_CODE_PAR_EXPRIE_STATEMENT);
run("CREATE ACCOUNT ac_wxy PASS '123456'", TSDB_CODE_PAR_EXPRIE_STATEMENT, PARSER_STAGE_PARSE);
}
TEST_F(ParserInitialCTest, createBnode) {
@ -186,7 +186,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
setDbReplicaFunc(3);
addDbRetentionFunc(15 * MILLISECOND_PER_SECOND, 7 * MILLISECOND_PER_DAY, TIME_UNIT_SECOND, TIME_UNIT_DAY);
addDbRetentionFunc(1 * MILLISECOND_PER_MINUTE, 21 * MILLISECOND_PER_DAY, TIME_UNIT_MINUTE, TIME_UNIT_DAY);
addDbRetentionFunc(15 * MILLISECOND_PER_MINUTE, 5, TIME_UNIT_MINUTE, TIME_UNIT_YEAR);
addDbRetentionFunc(15 * MILLISECOND_PER_MINUTE, 500 * MILLISECOND_PER_DAY, TIME_UNIT_MINUTE, TIME_UNIT_DAY);
setDbStrictaFunc(1);
setDbWalLevelFunc(2);
setDbVgroupsFunc(100);
@ -205,7 +205,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
"PAGESIZE 8 "
"PRECISION 'ns' "
"REPLICA 3 "
"RETENTIONS 15s:7d,1m:21d,15m:5y "
"RETENTIONS 15s:7d,1m:21d,15m:500d "
"STRICT 1 "
"WAL 2 "
"VGROUPS 100 "
@ -220,6 +220,17 @@ TEST_F(ParserInitialCTest, createDatabase) {
"KEEP 1440m,300h,400d ");
}
TEST_F(ParserInitialCTest, createDatabaseSemanticCheck) {
useDb("root", "test");
run("create database db2 retentions 0s:1d", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
run("create database db2 retentions 10s:0d", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
run("create database db2 retentions 1w:1d", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
run("create database db2 retentions 1w:1n", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
run("create database db2 retentions 15s:7d,15m:21d,10m:500d", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
run("create database db2 retentions 15s:7d,5m:21d,10m:10d", TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
}
TEST_F(ParserInitialCTest, createDnode) {
useDb("root", "test");
@ -434,6 +445,13 @@ TEST_F(ParserInitialCTest, createStable) {
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) FILE_FACTOR 0.1");
}
TEST_F(ParserInitialCTest, createStableSemanticCheck) {
useDb("root", "test");
run("CREATE STABLE stb2 (ts TIMESTAMP, c1 INT) TAGS (tag1 INT) ROLLUP(CEIL) FILE_FACTOR 0.1",
TSDB_CODE_PAR_INVALID_ROLLUP_OPTION, PARSER_STAGE_TRANSLATE);
}
TEST_F(ParserInitialCTest, createStream) {
useDb("root", "test");

View File

@ -65,6 +65,8 @@ TEST_F(ParserSelectTest, condition) {
run("SELECT c1 FROM t1 WHERE ts in (true, false)");
run("SELECT c1 FROM t1 WHERE NOT ts in (true, false)");
run("SELECT * FROM t1 WHERE c1 > 10 and c1 is not null");
}
@ -212,9 +214,11 @@ TEST_F(ParserSelectTest, interval) {
TEST_F(ParserSelectTest, intervalSemanticCheck) {
useDb("root", "test");
run("SELECT c1 FROM t1 INTERVAL(10s)", TSDB_CODE_PAR_NOT_SINGLE_GROUP, PARSER_STAGE_TRANSLATE);
run("SELECT DISTINCT c1, c2 FROM t1 WHERE c1 > 3 INTERVAL(1d) FILL(NEXT)", TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE,
PARSER_STAGE_TRANSLATE);
run("SELECT c1 FROM t1 INTERVAL(10s)", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
run("SELECT DISTINCT c1, c2 FROM t1 WHERE c1 > 3 INTERVAL(1d) FILL(NEXT)", TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
run("SELECT HISTOGRAM(c1, 'log_bin', '{\"start\": -33,\"factor\": 55,\"count\": 5,\"infinity\": false}', 1) FROM t1 "
"WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' INTERVAL(10s) FILL(NULL)",
TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, subquery) {

View File

@ -36,7 +36,7 @@ class ParserTestBase : public testing::Test {
void login(const std::string& user);
void useDb(const std::string& acctId, const std::string& db);
void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_TRANSLATE);
virtual void checkDdl(const SQuery* pQuery, ParserStage stage);

View File

@ -159,7 +159,7 @@ typedef struct SSyncNode {
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver;
SSnapshotMeta sMeta;
// SSnapshotMeta sMeta;
} SSyncNode;
@ -194,7 +194,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);

View File

@ -35,6 +35,7 @@ typedef struct SRaftCfg {
char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy;
int8_t snapshotEnable;
SyncIndex lastConfigIndex;
} SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path);
@ -52,8 +53,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta {
int8_t isStandBy;
int8_t snapshotEnable;
int8_t isStandBy;
int8_t snapshotEnable;
SyncIndex lastConfigIndex;
} SRaftCfgMeta;
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path);

View File

@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender {
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
SSyncCfg lastConfig;
int64_t sendingMS;
SSyncNode *pSyncNode;
int32_t replicaIndex;

View File

@ -88,6 +88,245 @@
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<candidateVars, leaderVars>>
//
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
assert(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths);
}
assert(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL);
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
}
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
}
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
}
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
int32_t code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
}
}
return ret;
}
#if 0
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
@ -375,7 +614,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
if (!isDrop) {
@ -437,6 +676,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return ret;
}
#endif
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t code;

View File

@ -191,14 +191,17 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld"
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, s);
taosMemoryFree(s);
} else {
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm);
"lastApplyTerm:%lu lastConfigIndex:%ld",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex);
}
}

View File

@ -192,6 +192,40 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
return ret;
}
int32_t syncLeaderTransfer(int64_t rid) {
int32_t ret = 0;
return ret;
}
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
assert(rid == pSyncNode->rid);
int32_t ret = 0;
if (pSyncNode->replicaNum == 1) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
sError("only one replica, cannot drop leader");
return TAOS_SYNC_ONLY_ONE_REPLICA;
}
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId;
ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
syncLeaderTransferDestroy(pMsg);
ret = syncPropose(rid, &rpcMsg, false);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
@ -206,6 +240,40 @@ int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg)
return ret;
}
bool syncCanLeaderTransfer(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
assert(rid == pSyncNode->rid);
if (pSyncNode->replicaNum == 1) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return false;
}
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return true;
}
bool matchOK = true;
if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncIndex myCommitIndex = pSyncNode->commitIndex;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
if (peerMatchIndex < myCommitIndex) {
matchOK = false;
}
}
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return matchOK;
}
int32_t syncGiveUpLeader(int64_t rid) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
@ -241,7 +309,9 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
return -1;
}
assert(rid == pSyncNode->rid);
*sMeta = pSyncNode->sMeta;
sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
@ -452,6 +522,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SRaftCfgMeta meta;
meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotEnable = pSyncInfo->snapshotEnable;
meta.lastConfigIndex = SYNC_INDEX_INVALID;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
assert(ret == 0);
@ -643,7 +714,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// syncNodeBecomeFollower(pSyncNode);
// snapshot meta
pSyncNode->sMeta.lastConfigIndex = -1;
// pSyncNode->sMeta.lastConfigIndex = -1;
return pSyncNode;
}
@ -1076,9 +1147,11 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) {
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *newConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
int32_t ret = 0;
// init internal
@ -1111,13 +1184,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
// isDrop
*isDrop = true;
bool IamInOld, IamInNew;
bool IamInOld = false;
bool IamInNew = false;
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
*isDrop = false;
IamInOld = true;
break;
}
}
@ -1125,16 +1197,21 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
*isDrop = false;
IamInNew = true;
break;
}
}
if (!(*isDrop)) {
// change isStandBy to normal
pSyncNode->pRaftCfg->isStandBy = 0;
*isDrop = true;
if (IamInOld && !IamInNew) {
*isDrop = true;
} else {
*isDrop = false;
}
if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
}
raftCfgPersist(pSyncNode->pRaftCfg);
if (gRaftDetailLog) {
@ -1163,7 +1240,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr);
sInfo("sync event vgId:%d become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
@ -1197,7 +1274,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr);
sInfo("sync event vgId:%d become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, debugStr);
// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
@ -1735,23 +1812,79 @@ const char* syncStr(ESyncState state) {
}
}
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer;
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
}
return 0;
}
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool IamInNew = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
bool isDrop;
if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths, "config change");
}
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
SReConfigCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
}
return 0;
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
endIndex, syncUtilState2String(state));
/*
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
*/
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
@ -1764,6 +1897,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
// user commit
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
@ -1780,61 +1914,14 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
break;
}
}
SReConfigCbMeta cbMeta = {0};
bool isDrop;
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths, "config change");
}
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
// config change
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
// restore finish

View File

@ -14,6 +14,7 @@
*/
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncUtil.h"
#include "tcoding.h"
@ -75,6 +76,11 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot = syncSnapshotRsp2Json(pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_LEADER_TRANSFER) {
SyncLeaderTransfer* pSyncMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncLeaderTransfer2Json(pSyncMsg);
syncLeaderTransferDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
pRoot = cJSON_CreateObject();
char* s;
@ -1841,6 +1847,10 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", u64buf);
cJSON_AddItemToObject(pRoot, "lastConfig", syncCfg2Json((SSyncCfg*)&(pMsg->lastConfig)));
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm);
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
@ -2055,4 +2065,166 @@ void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
sTrace("syncSnapshotRspLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---------------------------------------------
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLeaderTransfer);
SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
return pMsg;
}
void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
assert(buf != NULL);
syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes);
assert(pMsg != NULL);
syncLeaderTransferDeserialize(buf, len, pMsg);
assert(len == pMsg->bytes);
return pMsg;
}
void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) {
syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
/*
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
*/
cJSON* pNewerId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newLeaderId.addr);
cJSON_AddStringToObject(pNewerId, "addr", u64buf);
{
uint64_t u64 = pMsg->newLeaderId.addr;
cJSON* pTmp = pNewerId;
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId);
cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot);
return pJson;
}
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) {
cJSON* pJson = syncLeaderTransfer2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg) {
char* serialized = syncLeaderTransfer2Str(pMsg);
printf("syncLeaderTransferPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg) {
char* serialized = syncLeaderTransfer2Str(pMsg);
printf("syncLeaderTransferPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg) {
char* serialized = syncLeaderTransfer2Str(pMsg);
sTrace("syncLeaderTransferLog | len:%lu | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncLeaderTransfer2Str(pMsg);
sTrace("syncLeaderTransferLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}

View File

@ -150,6 +150,10 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
return pJson;
@ -172,6 +176,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN] = {0};
@ -199,6 +204,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);

View File

@ -553,15 +553,19 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
}
void logStoreLog(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
if (gRaftDetailLog) {
char* serialized = logStore2Str(pLogStore);
sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
}
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
if (gRaftDetailLog) {
char* serialized = logStore2Str(pLogStore);
sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// for debug -----------------

View File

@ -15,6 +15,7 @@
#include "syncSnapshot.h"
#include "syncIndexMgr.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
@ -83,6 +84,32 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
/*
SSyncRaftEntry *pEntry = NULL;
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry != NULL);
*/
SSyncRaftEntry *pEntry =
pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
ASSERT(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SSyncCfg lastConfig;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig;
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
} else {
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
}
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
@ -97,6 +124,8 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN
pMsg->privateTerm = pSender->privateTerm;
@ -112,15 +141,18 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
syncSnapshotSendDestroy(pMsg);
@ -228,6 +260,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
@ -245,20 +279,25 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
sTrace(
"sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
syncSnapshotSendDestroy(pMsg);
@ -274,6 +313,8 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
@ -352,7 +393,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -473,7 +514,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -497,7 +538,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -540,6 +581,42 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
// update new config myIndex
bool IamInNew = false;
SSyncCfg newSyncCfg = pMsg->lastConfig;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
bool isDrop;
if (IamInNew) {
sTrace("sync event vgId:%d update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
sTrace(
"sync event vgId:%d do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
}
// change isStandBy to normal
if (!isDrop) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, "config change");
} else {
syncNodeBecomeFollower(pSyncNode, "config change");
}
}
}
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);

View File

@ -214,29 +214,31 @@ void syncUtilMsgNtoH(void* msg) {
pHead->vgId = ntohl(pHead->vgId);
}
#if 0
bool syncUtilIsData(tmsg_t msgType) {
if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) {
return false;
}
return true;
}
#endif
bool syncUtilUserPreCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
return false;
}
bool syncUtilUserCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
return false;
}
bool syncUtilUserRollback(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true;
}
return false;

View File

@ -47,6 +47,7 @@ add_executable(syncTestTool "")
add_executable(syncRaftLogTest "")
add_executable(syncRaftLogTest2 "")
add_executable(syncRaftLogTest3 "")
add_executable(syncLeaderTransferTest "")
target_sources(syncTest
@ -245,6 +246,10 @@ target_sources(syncRaftLogTest3
PRIVATE
"syncRaftLogTest3.cpp"
)
target_sources(syncLeaderTransferTest
PRIVATE
"syncLeaderTransferTest.cpp"
)
target_include_directories(syncTest
@ -492,6 +497,11 @@ target_include_directories(syncRaftLogTest3
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncLeaderTransferTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
@ -690,6 +700,10 @@ target_link_libraries(syncRaftLogTest3
sync
gtest_main
)
target_link_libraries(syncLeaderTransferTest
sync
gtest_main
)
enable_testing()

View File

@ -0,0 +1,101 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncLeaderTransfer *createMsg() {
SyncLeaderTransfer *pMsg = syncLeaderTransferBuild(1000);
/*
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
*/
pMsg->newLeaderId.addr = syncUtilAddr2U64("127.0.0.1", 9999);
pMsg->newLeaderId.vgId = 100;
return pMsg;
}
void test1() {
SyncLeaderTransfer *pMsg = createMsg();
syncLeaderTransferLog2((char *)"test1:", pMsg);
syncLeaderTransferDestroy(pMsg);
}
void test2() {
SyncLeaderTransfer *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len);
syncLeaderTransferSerialize(pMsg, serialized, len);
SyncLeaderTransfer *pMsg2 = syncLeaderTransferBuild(1000);
syncLeaderTransferDeserialize(serialized, len, pMsg2);
syncLeaderTransferLog2((char *)"test2: syncLeaderTransferSerialize -> syncLeaderTransferDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncLeaderTransferDestroy(pMsg);
syncLeaderTransferDestroy(pMsg2);
}
void test3() {
SyncLeaderTransfer *pMsg = createMsg();
uint32_t len;
char * serialized = syncLeaderTransferSerialize2(pMsg, &len);
SyncLeaderTransfer *pMsg2 = syncLeaderTransferDeserialize2(serialized, len);
syncLeaderTransferLog2((char *)"test3: syncLeaderTransferSerialize2 -> syncLeaderTransferDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncLeaderTransferDestroy(pMsg);
syncLeaderTransferDestroy(pMsg2);
}
void test4() {
SyncLeaderTransfer *pMsg = createMsg();
SRpcMsg rpcMsg;
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
SyncLeaderTransfer *pMsg2 = (SyncLeaderTransfer *)taosMemoryMalloc(rpcMsg.contLen);
syncLeaderTransferFromRpcMsg(&rpcMsg, pMsg2);
syncLeaderTransferLog2((char *)"test4: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncLeaderTransferDestroy(pMsg);
syncLeaderTransferDestroy(pMsg2);
}
void test5() {
SyncLeaderTransfer *pMsg = createMsg();
SRpcMsg rpcMsg;
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
SyncLeaderTransfer *pMsg2 = syncLeaderTransferFromRpcMsg2(&rpcMsg);
syncLeaderTransferLog2((char *)"test5: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncLeaderTransferDestroy(pMsg);
syncLeaderTransferDestroy(pMsg2);
}
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}

View File

@ -74,6 +74,7 @@ void test3() {
SRaftCfgMeta meta;
meta.isStandBy = 7;
meta.snapshotEnable = 9;
meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
}
@ -98,6 +99,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
pCfg->lastConfigIndex += 1000;
raftCfgPersist(pCfg);
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);

View File

@ -24,6 +24,16 @@ SyncSnapshotSend *createMsg() {
pMsg->privateTerm = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->lastConfigIndex = 99;
pMsg->lastConfig.replicaNum = 3;
pMsg->lastConfig.myIndex = 1;
for (int i = 0; i < pMsg->lastConfig.replicaNum; ++i) {
((pMsg->lastConfig.nodeInfo)[i]).nodePort = i * 100;
snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn),
"100.200.300.%d", i);
}
pMsg->seq = 44;
strcpy(pMsg->data, "hello world");
return pMsg;
@ -87,6 +97,8 @@ void test5() {
}
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();

View File

@ -71,6 +71,7 @@
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim

View File

@ -95,7 +95,7 @@ endi
if $data6_db != 345600 then # days
return -1
endi
if $data7_db != 1440000,1440000,1440000 then # keep
if $data7_db != 1440000m,1440000m,1440000m then # keep
return -1
endi
if $data8_db != 96 then # buffer
@ -232,7 +232,7 @@ print ============== modify keep
sql alter database db keep 2400
sql show databases
print keep $data7_db
if $data7_db != 3456000,3456000,3456000 then
if $data7_db != 3456000m,3456000m,3456000m then
return -1
endi

View File

@ -37,7 +37,7 @@ endi
if $data26 != 2880 then
return -1
endi
if $data27 != 14400,14400,14400 then
if $data27 != 14400m,14400m,14400m then
return -1
endi
#if $data28 != 32 then

View File

@ -116,7 +116,7 @@ endi
if $data6_db != 14400 then # days
return -1
endi
if $data7_db != 5256000,5256000,5256000 then # keep
if $data7_db != 5256000m,5256000m,5256000m then # keep
return -1
endi
if $data8_db != 96 then # buffer

View File

@ -0,0 +1,176 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sleep 1000
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts1 values(1648791233003,3,2,3,2.1);
sql insert into ts2 values(1648791243004,4,2,43,73.1);
sql insert into ts1 values(1648791213002,24,22,23,4.1);
sql insert into ts1 values(1648791243005,4,20,3,3.1);
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
sql insert into ts4 values(1648791213002,24,22,23,4.1);
sql insert into ts3 values(1648791243005,4,20,3,3.1);
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop1:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 8 then
print =====data01=$data01
goto loop1
endi
if $data02 != 4 then
print =====data02=$data02
goto loop1
endi
if $data03 != 4 then
print ======$data03
return -1
endi
if $data04 != 52 then
print ======$data04
return -1
endi
if $data05 != 13 then
print ======$data05
return -1
endi
# row 1
if $data11 != 6 then
print =====data11=$data11
goto loop1
endi
if $data12 != 6 then
print =====data12=$data12
goto loop1
endi
if $data13 != 92 then
print ======$data13
return -1
endi
if $data14 != 22 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print =====data21=$data21
goto loop1
endi
if $data22 != 4 then
print =====data22=$data22
goto loop1
endi
if $data23 != 32 then
print ======$data23
return -1
endi
if $data24 != 12 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 30 then
print =====data31=$data31
goto loop1
endi
if $data32 != 30 then
print =====data32=$data32
goto loop1
endi
if $data33 != 180 then
print ======$data33
return -1
endi
if $data34 != 42 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT