Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
c8280e05dd
|
@ -2,7 +2,7 @@
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "3.2.3.0.alpha")
|
||||
SET(TD_VER_NUMBER "3.2.4.0.alpha")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -842,12 +842,12 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
|
|||
|
||||
In addition to native connections, the client library also supports subscriptions via websockets.
|
||||
|
||||
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/#create-a-consumer).
|
||||
The syntax for creating a consumer is "consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/#create-a-consumer).
|
||||
|
||||
```python
|
||||
import taosws
|
||||
|
||||
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
|
||||
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
@ -887,13 +887,13 @@ The `poll` function is used to consume data in tmq. The parameter of the `poll`
|
|||
|
||||
```python
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
message = consumer.poll(1)
|
||||
if not message:
|
||||
continue
|
||||
err = res.error()
|
||||
err = message.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
val = message.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
|
@ -902,16 +902,14 @@ while True:
|
|||
</TabItem>
|
||||
<TabItem value="websocket" label="WebSocket connection">
|
||||
|
||||
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
|
||||
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out.
|
||||
|
||||
```python
|
||||
while True:
|
||||
res = consumer.poll(timeout=1.0)
|
||||
if not res:
|
||||
message = consumer.poll(1)
|
||||
if not message:
|
||||
continue
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
|
||||
for block in message:
|
||||
for row in block:
|
||||
print(row)
|
||||
|
|
|
@ -41,16 +41,28 @@ window_clause: {
|
|||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
| COUNT_WINDOW(count_val[, sliding_val])
|
||||
}
|
||||
```
|
||||
|
||||
`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.
|
||||
|
||||
`EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` can be any conditional expressions supported by TDengine and can include multiple columns.
|
||||
|
||||
`COUNT_WINDOW` is a counting window that is divided by a fixed number of data rows.`count_val`: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. `count_val` represents the maximum number of data rows contained in each `COUNT_WINDOW`. When the total number of data rows cannot be divided by `count_val`, the number of rows in the last window will be less than `count_val`. `sliding_val`: is a constant that represents the number of window slides, similar to `SLIDING` in `INTERVAL`.
|
||||
|
||||
For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable.
|
||||
|
||||
```sql
|
||||
CREATE STREAM avg_vol_s INTO avg_vol AS
|
||||
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
|
||||
|
||||
CREATE STREAM streams0 INTO streamt0 AS
|
||||
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9;
|
||||
|
||||
CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS
|
||||
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10);
|
||||
```
|
||||
|
||||
## Partitions of Stream
|
||||
|
|
|
@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 3.2.3.0
|
||||
|
||||
<Release type="tdengine" version="3.2.3.0" />
|
||||
|
||||
## 3.2.2.0
|
||||
|
||||
<Release type="tdengine" version="3.2.2.0" />
|
||||
|
|
|
@ -856,7 +856,7 @@ taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创
|
|||
```python
|
||||
import taosws
|
||||
|
||||
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
|
||||
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
@ -896,13 +896,13 @@ Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 flo
|
|||
|
||||
```python
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
message = consumer.poll(1)
|
||||
if not message:
|
||||
continue
|
||||
err = res.error()
|
||||
err = message.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
val = message.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
|
@ -911,16 +911,14 @@ while True:
|
|||
</TabItem>
|
||||
<TabItem value="websocket" label="WebSocket 连接">
|
||||
|
||||
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
|
||||
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。
|
||||
|
||||
```python
|
||||
while True:
|
||||
res = consumer.poll(timeout=1.0)
|
||||
if not res:
|
||||
message = consumer.poll(1)
|
||||
if not message:
|
||||
continue
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
|
||||
for block in message:
|
||||
for row in block:
|
||||
print(row)
|
||||
|
|
|
@ -49,10 +49,14 @@ window_clause: {
|
|||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
| COUNT_WINDOW(count_val[, sliding_val])
|
||||
}
|
||||
```
|
||||
|
||||
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
|
||||
EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。
|
||||
COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_val 是常量,是正整数,必须大于等于2,小于2147483648。 count_val 表示每个 COUNT_WINDOW 包含的最大数据行数,总数据行数不能整除 count_val 时,最后一个窗口的行数会小于 count_val 。 sliding_val 是常量,表示窗口滑动的数量,类似于 INTERVAL 的 SLIDING 。
|
||||
|
||||
窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
|
||||
|
||||
|
@ -61,6 +65,12 @@ window_clause: {
|
|||
```sql
|
||||
CREATE STREAM avg_vol_s INTO avg_vol AS
|
||||
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
|
||||
|
||||
CREATE STREAM streams0 INTO streamt0 AS
|
||||
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9;
|
||||
|
||||
CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS
|
||||
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10);
|
||||
```
|
||||
|
||||
## 流式计算的 partition
|
||||
|
|
|
@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
|
|||
|
||||
import Release from "/components/ReleaseV3";
|
||||
|
||||
## 3.2.3.0
|
||||
|
||||
<Release type="tdengine" version="3.2.3.0" />
|
||||
|
||||
## 3.2.2.0
|
||||
|
||||
<Release type="tdengine" version="3.2.2.0" />
|
||||
|
|
|
@ -219,7 +219,6 @@ extern bool tsFilterScalarMode;
|
|||
extern int32_t tsMaxStreamBackendCache;
|
||||
extern int32_t tsPQSortMemThreshold;
|
||||
extern int32_t tsResolveFQDNRetryTime;
|
||||
extern bool tsDisableCount;
|
||||
|
||||
extern bool tsExperimental;
|
||||
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
|
|
@ -581,8 +581,8 @@ typedef struct {
|
|||
};
|
||||
} SSubmitRsp;
|
||||
|
||||
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
||||
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
||||
// int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
||||
// int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
||||
// void tFreeSSubmitBlkRsp(void* param);
|
||||
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
|
||||
|
||||
|
@ -886,8 +886,8 @@ typedef struct {
|
|||
int64_t maxStorage;
|
||||
} SCreateAcctReq, SAlterAcctReq;
|
||||
|
||||
int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||||
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||||
// int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||||
// int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char user[TSDB_USER_LEN];
|
||||
|
@ -3447,7 +3447,7 @@ int32_t tDeserializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexR
|
|||
|
||||
typedef SMDropSmaReq SDropTagIndexReq;
|
||||
|
||||
int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
|
||||
// int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
|
||||
int32_t tDeserializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
|
@ -3568,8 +3568,8 @@ typedef struct {
|
|||
int8_t igNotExists;
|
||||
} SMDropFullTextReq;
|
||||
|
||||
int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
|
||||
int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
|
||||
// int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
|
||||
// int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char indexFName[TSDB_INDEX_FNAME_LEN];
|
||||
|
|
|
@ -433,7 +433,7 @@ int32_t* taosGetErrno();
|
|||
|
||||
//mnode-compact
|
||||
#define TSDB_CODE_MND_INVALID_COMPACT_ID TAOS_DEF_ERROR_CODE(0, 0x04B1)
|
||||
|
||||
#define TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x04B2)
|
||||
|
||||
// vnode
|
||||
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x
|
||||
|
|
|
@ -187,6 +187,8 @@ typedef enum ELogicConditionType {
|
|||
LOGIC_COND_TYPE_NOT,
|
||||
} ELogicConditionType;
|
||||
|
||||
#define TSDB_INT32_ID_LEN 11
|
||||
|
||||
#define TSDB_NAME_DELIMITER_LEN 1
|
||||
|
||||
#define TSDB_UNI_LEN 24
|
||||
|
|
|
@ -26,6 +26,8 @@ typedef struct SScalableBf {
|
|||
SArray *bfArray; // array of bloom filters
|
||||
uint32_t growth;
|
||||
uint64_t numBits;
|
||||
uint32_t maxBloomFilters;
|
||||
int8_t status;
|
||||
_hash_fn_t hashFn1;
|
||||
_hash_fn_t hashFn2;
|
||||
} SScalableBf;
|
||||
|
|
|
@ -452,20 +452,21 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
|
|||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
int32_t newLen = pSource->varmeta.length;
|
||||
memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
|
||||
if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
|
||||
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length);
|
||||
if (pColumnInfoData->varmeta.allocLen < newLen) {
|
||||
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = tmp;
|
||||
pColumnInfoData->varmeta.allocLen = pSource->varmeta.length;
|
||||
pColumnInfoData->varmeta.allocLen = newLen;
|
||||
}
|
||||
|
||||
pColumnInfoData->varmeta.length = pSource->varmeta.length;
|
||||
pColumnInfoData->varmeta.length = newLen;
|
||||
if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
|
||||
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
|
||||
memcpy(pColumnInfoData->pData, pSource->pData, newLen);
|
||||
}
|
||||
} else {
|
||||
memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
|
||||
|
@ -1687,7 +1688,29 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
|
|||
}
|
||||
|
||||
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||
if (n >= total || n == 0) return;
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
if (pColInfoData->varmeta.length != 0) {
|
||||
int32_t newLen = pColInfoData->varmeta.offset[n];
|
||||
if (-1 == newLen) {
|
||||
for (int i = n - 1; i >= 0; --i) {
|
||||
newLen = pColInfoData->varmeta.offset[i];
|
||||
if (newLen != -1) {
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
newLen += getJsonValueLen(pColInfoData->pData + newLen);
|
||||
} else {
|
||||
newLen += varDataTLen(pColInfoData->pData + newLen);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newLen <= -1) {
|
||||
uFatal("colDataKeepFirstNRows: newLen:%d old:%d", newLen, pColInfoData->varmeta.length);
|
||||
} else {
|
||||
pColInfoData->varmeta.length = newLen;
|
||||
}
|
||||
}
|
||||
// pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
|
||||
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
|
||||
}
|
||||
|
|
|
@ -269,7 +269,6 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024;
|
|||
bool tsFilterScalarMode = false;
|
||||
int tsResolveFQDNRetryTime = 100; // seconds
|
||||
int tsStreamAggCnt = 1000;
|
||||
bool tsDisableCount = true;
|
||||
|
||||
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
|
||||
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
|
||||
|
@ -541,8 +540,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "disableCount", tsDisableCount, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1109,8 +1106,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
|
||||
|
||||
tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
|
||||
|
||||
tsDisableCount = cfgGetItem(pCfg, "disableCount")->bval;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1739,8 +1734,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
{"shellActivityTimer", &tsShellActivityTimer},
|
||||
{"slowLogThreshold", &tsSlowLogThreshold},
|
||||
{"useAdapter", &tsUseAdapter},
|
||||
{"experimental", &tsExperimental},
|
||||
{"disableCount", &tsDisableCount}};
|
||||
{"experimental", &tsExperimental}};
|
||||
|
||||
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
|
||||
taosCfgSetOption(options, tListLen(options), pItem, false);
|
||||
|
|
|
@ -1009,19 +1009,19 @@ int32_t tDeserializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexR
|
|||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
// int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, buf, bufLen);
|
||||
// if (tStartEncode(&encoder) < 0) return -1;
|
||||
// tEndEncode(&encoder);
|
||||
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
// int32_t tlen = encoder.pos;
|
||||
// tEncoderClear(&encoder);
|
||||
// return tlen;
|
||||
// }
|
||||
int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
@ -1035,6 +1035,7 @@ int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMCreateFullTextReq(void *buf, int32_t bufLen, SMCreateFullTextReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
@ -1059,32 +1060,32 @@ void tFreeSMCreateFullTextReq(SMCreateFullTextReq *pReq) {
|
|||
// impl later
|
||||
return;
|
||||
}
|
||||
int32_t tSerializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
// int32_t tSerializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
// if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
|
||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||
// tEndEncode(&encoder);
|
||||
// int32_t tlen = encoder.pos;
|
||||
// tEncoderClear(&encoder);
|
||||
// return tlen;
|
||||
// }
|
||||
// int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
|
||||
// SDecoder decoder = {0};
|
||||
// tDecoderInit(&decoder, buf, bufLen);
|
||||
// if (tStartDecode(&decoder) < 0) return -1;
|
||||
// if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
// if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
// tEndDecode(&decoder);
|
||||
// tDecoderClear(&decoder);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
|
@ -1474,44 +1475,44 @@ void tFreeSStatisReq(SStatisReq *pReq) {
|
|||
taosMemoryFreeClear(pReq->pCont);
|
||||
}
|
||||
|
||||
int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
// int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->maxUsers) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->maxDbs) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->maxTimeSeries) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->maxStreams) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->accessState) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->maxStorage) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
// if (tStartEncode(&encoder) < 0) return -1;
|
||||
// if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
|
||||
// if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1;
|
||||
// if (tEncodeI32(&encoder, pReq->maxUsers) < 0) return -1;
|
||||
// if (tEncodeI32(&encoder, pReq->maxDbs) < 0) return -1;
|
||||
// if (tEncodeI32(&encoder, pReq->maxTimeSeries) < 0) return -1;
|
||||
// if (tEncodeI32(&encoder, pReq->maxStreams) < 0) return -1;
|
||||
// if (tEncodeI32(&encoder, pReq->accessState) < 0) return -1;
|
||||
// if (tEncodeI64(&encoder, pReq->maxStorage) < 0) return -1;
|
||||
// tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
// int32_t tlen = encoder.pos;
|
||||
// tEncoderClear(&encoder);
|
||||
// return tlen;
|
||||
// }
|
||||
|
||||
int32_t tDeserializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
// int32_t tDeserializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
|
||||
// SDecoder decoder = {0};
|
||||
// tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->maxUsers) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->maxDbs) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->maxTimeSeries) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->maxStreams) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->accessState) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->maxStorage) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
// if (tStartDecode(&decoder) < 0) return -1;
|
||||
// if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
|
||||
// if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1;
|
||||
// if (tDecodeI32(&decoder, &pReq->maxUsers) < 0) return -1;
|
||||
// if (tDecodeI32(&decoder, &pReq->maxDbs) < 0) return -1;
|
||||
// if (tDecodeI32(&decoder, &pReq->maxTimeSeries) < 0) return -1;
|
||||
// if (tDecodeI32(&decoder, &pReq->maxStreams) < 0) return -1;
|
||||
// if (tDecodeI32(&decoder, &pReq->accessState) < 0) return -1;
|
||||
// if (tDecodeI64(&decoder, &pReq->maxStorage) < 0) return -1;
|
||||
// tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
// tDecoderClear(&decoder);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
int32_t tSerializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
|
@ -5238,11 +5239,11 @@ int32_t tDeserializeSQueryCompactProgressRsp(void *buf, int32_t bufLen, SQueryCo
|
|||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->finished) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -2;
|
||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -3;
|
||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -4;
|
||||
if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -5;
|
||||
if (tDecodeI32(&decoder, &pReq->finished) < 0) return -6;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -7934,64 +7935,64 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
// static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
|
||||
// if (tStartDecode(pDecoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
||||
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||
if (NULL == pBlock->tblFName) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
|
||||
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
|
||||
// if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
|
||||
// if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
||||
// pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||
// if (NULL == pBlock->tblFName) return -1;
|
||||
// if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
|
||||
// if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
|
||||
// if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
|
||||
// if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
|
||||
|
||||
int32_t meta = 0;
|
||||
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
|
||||
if (meta) {
|
||||
pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||
if (NULL == pBlock->pMeta) return -1;
|
||||
if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1;
|
||||
} else {
|
||||
pBlock->pMeta = NULL;
|
||||
}
|
||||
// int32_t meta = 0;
|
||||
// if (tDecodeI32(pDecoder, &meta) < 0) return -1;
|
||||
// if (meta) {
|
||||
// pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||
// if (NULL == pBlock->pMeta) return -1;
|
||||
// if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1;
|
||||
// } else {
|
||||
// pBlock->pMeta = NULL;
|
||||
// }
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
// tEndDecode(pDecoder);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) {
|
||||
int32_t nBlocks = taosArrayGetSize(pRsp->pArray);
|
||||
// int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) {
|
||||
// int32_t nBlocks = taosArrayGetSize(pRsp->pArray);
|
||||
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
// if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
|
||||
if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1;
|
||||
}
|
||||
// if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1;
|
||||
// if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1;
|
||||
// if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1;
|
||||
// for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
|
||||
// if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1;
|
||||
// }
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return 0;
|
||||
}
|
||||
// tEndEncode(pEncoder);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
// int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
|
||||
// if (tStartDecode(pDecoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
|
||||
pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks));
|
||||
if (pRsp->pBlocks == NULL) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
|
||||
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
||||
}
|
||||
// if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
|
||||
// if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
|
||||
// if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
|
||||
// pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks));
|
||||
// if (pRsp->pBlocks == NULL) return -1;
|
||||
// for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
|
||||
// if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
||||
// }
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
tDecoderClear(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
// tEndDecode(pDecoder);
|
||||
// tDecoderClear(pDecoder);
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
// void tFreeSSubmitBlkRsp(void *param) {
|
||||
// if (NULL == param) {
|
||||
|
|
|
@ -345,6 +345,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.parent = pDnode;
|
||||
rpcInit.rfp = rpcRfp;
|
||||
rpcInit.compressSize = tsCompressMsgSize;
|
||||
rpcInit.dfp = destroyAhandle;
|
||||
|
||||
rpcInit.retryMinInterval = tsRedirectPeriod;
|
||||
rpcInit.retryStepFactor = tsRedirectFactor;
|
||||
|
|
|
@ -454,7 +454,7 @@ int32_t mndProcessKillCompactReq(SRpcMsg *pReq){
|
|||
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
char obj[MND_COMPACT_ID_LEN] = {0};
|
||||
char obj[TSDB_INT32_ID_LEN] = {0};
|
||||
sprintf(obj, "%d", pCompact->compactId);
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
|
||||
|
@ -490,13 +490,17 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c
|
|||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
}
|
||||
|
||||
return -1;
|
||||
return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
|
||||
}
|
||||
|
||||
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
||||
SQueryCompactProgressRsp req = {0};
|
||||
if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
int32_t code = 0;
|
||||
code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d",
|
||||
code, pReq->pCont, pReq->contLen);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -504,10 +508,10 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
|
|||
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
|
||||
|
||||
if(mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req) != 0){
|
||||
code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
|
||||
if(code != 0){
|
||||
terrno = code;
|
||||
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
|
||||
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
|
||||
return -1;
|
||||
|
|
|
@ -709,7 +709,8 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
|
||||
if (code != 0) {
|
||||
mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||
mGError("vgId:1, failed to process sync msg:%p type:%s, errno: %s, code:0x%x", pMsg, TMSG_INFO(pMsg->msgType),
|
||||
terrstr(), code);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -789,25 +789,6 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_
|
|||
return code;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int idx;
|
||||
SLastKey key;
|
||||
|
@ -1052,6 +1033,25 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
#ifdef BUILD_NO_CALL
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -1233,10 +1233,10 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
|
||||
int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
|
||||
int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
|
||||
SArray* lastTmpColArray = NULL;
|
||||
SArray* lastTmpIndexArray = NULL;
|
||||
SArray* lastrowTmpColArray = NULL;
|
||||
SArray* lastrowTmpIndexArray = NULL;
|
||||
SArray *lastTmpColArray = NULL;
|
||||
SArray *lastTmpIndexArray = NULL;
|
||||
SArray *lastrowTmpColArray = NULL;
|
||||
SArray *lastrowTmpIndexArray = NULL;
|
||||
|
||||
int lastIndex = 0;
|
||||
int lastrowIndex = 0;
|
||||
|
@ -1245,7 +1245,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
SIdxKey *idxKey = taosArrayGet(remainCols, i);
|
||||
slotIds[i] = pr->pSlotIds[idxKey->idx];
|
||||
if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) {
|
||||
if(NULL == lastTmpIndexArray) {
|
||||
if (NULL == lastTmpIndexArray) {
|
||||
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
|
||||
}
|
||||
taosArrayPush(lastTmpIndexArray, &(i));
|
||||
|
@ -1253,7 +1253,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
|
||||
lastIndex++;
|
||||
} else {
|
||||
if(NULL == lastrowTmpIndexArray) {
|
||||
if (NULL == lastrowTmpIndexArray) {
|
||||
lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
|
||||
}
|
||||
taosArrayPush(lastrowTmpIndexArray, &(i));
|
||||
|
@ -1265,17 +1265,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
|
||||
pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
|
||||
|
||||
if(lastTmpIndexArray != NULL) {
|
||||
if (lastTmpIndexArray != NULL) {
|
||||
mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds);
|
||||
for(int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
|
||||
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i));
|
||||
for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
|
||||
taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i));
|
||||
}
|
||||
}
|
||||
|
||||
if(lastrowTmpIndexArray != NULL) {
|
||||
if (lastrowTmpIndexArray != NULL) {
|
||||
mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
|
||||
for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
|
||||
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i));
|
||||
for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
|
||||
taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
|
||||
taosArrayGet(lastrowTmpColArray, i));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -372,8 +372,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
|
||||
int32_t code = syncProcessMsg(pVnode->sync, pMsg);
|
||||
if (code != 0) {
|
||||
vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), terrstr());
|
||||
vGError("vgId:%d, failed to process sync msg:%p type:%s, errno: %s, code:0x%x", pVnode->config.vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), terrstr(), code);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -94,10 +94,10 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows;) {
|
||||
int32_t step = pInfo->windowSliding;
|
||||
SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow);
|
||||
int32_t prevRows = pBuffInfo->winRows;
|
||||
int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
|
||||
int32_t step = num;
|
||||
if (prevRows == 0) {
|
||||
pInfo->pRow->win.skey = tsCols[i];
|
||||
}
|
||||
|
@ -118,6 +118,8 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
if (prevRows <= pInfo->windowSliding) {
|
||||
if (pBuffInfo->winRows > pInfo->windowSliding) {
|
||||
step = pInfo->windowSliding - prevRows;
|
||||
} else {
|
||||
step = pInfo->windowSliding;
|
||||
}
|
||||
} else {
|
||||
step = 0;
|
||||
|
|
|
@ -1016,10 +1016,6 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (!pCxt->pPlanCxt->streamQuery && tsDisableCount) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pWindow->winType = WINDOW_TYPE_COUNT;
|
||||
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
|
||||
pWindow->node.requireDataOrder =
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#define DEFAULT_MAP_CAPACITY 131072
|
||||
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100)
|
||||
#define ROWS_PER_MILLISECOND 1
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
#define MAX_NUM_SCALABLE_BF 64
|
||||
#define MIN_NUM_SCALABLE_BF 10
|
||||
#define DEFAULT_PREADD_BUCKET 1
|
||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||
|
@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
|||
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
||||
if (watermark <= adjInterval) {
|
||||
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||
}
|
||||
|
||||
if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||
}
|
||||
return watermark;
|
||||
|
|
|
@ -256,21 +256,21 @@ void transAsyncPoolDestroy(SAsyncPool* pool);
|
|||
int transAsyncSend(SAsyncPool* pool, queue* mq);
|
||||
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||
|
||||
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
|
||||
do { \
|
||||
for (int i = 0; i < pool->nAsync; i++) { \
|
||||
uv_async_t* async = &(pool->asyncs[i]); \
|
||||
SAsyncItem* item = async->data; \
|
||||
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
|
||||
tTrace("destroy msg in async pool "); \
|
||||
queue* h = QUEUE_HEAD(&item->qmsg); \
|
||||
QUEUE_REMOVE(h); \
|
||||
msgType* msg = QUEUE_DATA(h, msgType, q); \
|
||||
if (msg != NULL) { \
|
||||
freeFunc(msg); \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
|
||||
do { \
|
||||
for (int i = 0; i < pool->nAsync; i++) { \
|
||||
uv_async_t* async = &(pool->asyncs[i]); \
|
||||
SAsyncItem* item = async->data; \
|
||||
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
|
||||
tTrace("destroy msg in async pool "); \
|
||||
queue* h = QUEUE_HEAD(&item->qmsg); \
|
||||
QUEUE_REMOVE(h); \
|
||||
msgType* msg = QUEUE_DATA(h, msgType, q); \
|
||||
if (msg != NULL) { \
|
||||
freeFunc(msg, param); \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
||||
|
|
|
@ -191,6 +191,15 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
|||
taosMemoryFree(msg->cont);
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
static void httpDestroyMsgWrapper(void* cont, void* param) {
|
||||
httpDestroyMsg((SHttpMsg*)cont);
|
||||
// if (msg == NULL) return;
|
||||
|
||||
// taosMemoryFree(msg->server);
|
||||
// taosMemoryFree(msg->uri);
|
||||
// taosMemoryFree(msg->cont);
|
||||
// taosMemoryFree(msg);
|
||||
}
|
||||
|
||||
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||
|
@ -554,7 +563,7 @@ void transHttpEnvDestroy() {
|
|||
httpSendQuit();
|
||||
taosThreadJoin(load->thread, NULL);
|
||||
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg);
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
|
||||
transAsyncPoolDestroy(load->asyncPool);
|
||||
uv_loop_close(load->loop);
|
||||
taosMemoryFree(load->loop);
|
||||
|
|
|
@ -219,6 +219,8 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
|
|||
/// NULL,cliHandleUpdate};
|
||||
|
||||
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||
|
||||
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param);
|
||||
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
|
||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
||||
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
|
||||
|
@ -1963,7 +1965,17 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
|
|||
transFreeMsg(pMsg->msg.pCont);
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
|
||||
SCliMsg* pMsg = arg;
|
||||
if (pMsg == NULL) {
|
||||
return;
|
||||
}
|
||||
if (param != NULL) {
|
||||
SCliThrd* pThrd = param;
|
||||
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
||||
if (param == NULL) return;
|
||||
|
||||
|
@ -2057,7 +2069,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
taosThreadJoin(pThrd->thread, NULL);
|
||||
CLI_RELEASE_UV(pThrd->loop);
|
||||
taosThreadMutexDestroy(&pThrd->msgMtx);
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsgWrapper, (void*)pThrd);
|
||||
transAsyncPoolDestroy(pThrd->asyncPool);
|
||||
|
||||
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
|
||||
|
|
|
@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
|
|||
|
||||
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||
|
||||
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||
|
@ -671,7 +671,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
|
|||
transFreeMsg(smsg->msg.pCont);
|
||||
taosMemoryFree(smsg);
|
||||
}
|
||||
static void destroyAllConn(SWorkThrd* pThrd) {
|
||||
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
|
||||
static void destroyAllConn(SWorkThrd* pThrd) {
|
||||
tTrace("thread %p destroy all conn ", pThrd);
|
||||
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
||||
queue* h = QUEUE_HEAD(&pThrd->conn);
|
||||
|
@ -1394,7 +1395,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
|
|||
}
|
||||
taosThreadJoin(pThrd->thread, NULL);
|
||||
SRV_RELEASE_UV(pThrd->loop);
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
|
||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
|
||||
transAsyncPoolDestroy(pThrd->asyncPool);
|
||||
|
||||
uvWhiteListDestroy(pThrd->pWhiteList);
|
||||
|
|
|
@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_NOT_EXIST, "view not exists in db
|
|||
|
||||
//mnode-compact
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COMPACT_ID, "Invalid compact id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST, "compact detail doesn't exist")
|
||||
|
||||
// dnode
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
|
||||
#define DEFAULT_GROWTH 2
|
||||
#define DEFAULT_TIGHTENING_RATIO 0.5
|
||||
#define DEFAULT_MAX_BLOOMFILTERS 4
|
||||
#define SBF_INVALID -1
|
||||
#define SBF_VALID 0
|
||||
|
||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
|
||||
|
||||
|
@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
|||
if (pSBf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
|
||||
pSBf->status = SBF_VALID;
|
||||
pSBf->numBits = 0;
|
||||
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
|
||||
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
|
||||
|
@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
|||
}
|
||||
|
||||
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||
if (pSBf->status == SBF_INVALID) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||
ASSERT(pNormalBf);
|
||||
|
@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
|
|||
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||
if (pNormalBf == NULL) {
|
||||
pSBf->status = SBF_INVALID;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
|
|||
}
|
||||
|
||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||
if (pSBf->status == SBF_INVALID) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||
|
@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
|||
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||
if (pNormalBf == NULL) {
|
||||
pSBf->status = SBF_INVALID;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
|||
}
|
||||
|
||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||
if (pSBf->status == SBF_INVALID) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||
|
@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
|
|||
}
|
||||
|
||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
|
||||
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
|
||||
if (pNormalBf == NULL) {
|
||||
return NULL;
|
||||
|
@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
|
|||
}
|
||||
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
|
||||
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
|
||||
if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
|
|||
}
|
||||
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
|
||||
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
|
||||
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error;
|
||||
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error;
|
||||
return pSBf;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -238,7 +238,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db-removewal.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb-removewal.py -N 6 -n 3
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1
|
||||
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3
|
||||
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3
|
||||
|
@ -1085,6 +1086,7 @@
|
|||
,,y,script,./test.sh -f tsim/parser/join_multivnode.sim
|
||||
,,y,script,./test.sh -f tsim/parser/join.sim
|
||||
,,y,script,./test.sh -f tsim/parser/last_cache.sim
|
||||
,,y,script,./test.sh -f tsim/parser/last_both.sim
|
||||
,,y,script,./test.sh -f tsim/parser/last_groupby.sim
|
||||
,,y,script,./test.sh -f tsim/parser/lastrow.sim
|
||||
,,y,script,./test.sh -f tsim/parser/lastrow2.sim
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
print ======================== dnode1 start
|
||||
$db = testdb
|
||||
sql drop database if exists $db
|
||||
sql create database $db cachemodel 'both' minrows 10 stt_trigger 1
|
||||
sql use $db
|
||||
|
||||
sql create stable st2 (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) tags (id int)
|
||||
sql create table tb1 using st2 tags (1);
|
||||
sql create table tb2 using st2 tags (2);
|
||||
sql create table tb3 using st2 tags (3);
|
||||
sql create table tb4 using st2 tags (4);
|
||||
sql create table tb5 using st2 tags (1);
|
||||
sql create table tb6 using st2 tags (2);
|
||||
sql create table tb7 using st2 tags (3);
|
||||
sql create table tb8 using st2 tags (4);
|
||||
sql create table tb9 using st2 tags (5);
|
||||
sql create table tba using st2 tags (5);
|
||||
sql create table tbb using st2 tags (5);
|
||||
sql create table tbc using st2 tags (5);
|
||||
sql create table tbd using st2 tags (5);
|
||||
sql create table tbe using st2 tags (5);
|
||||
sql create table tbf using st2 tags (5);
|
||||
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.000",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.001",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.002",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.003",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.004",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.005",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.006",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.007",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.008",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.009",28, 29, '30', -1005)
|
||||
sql delete from tb9 where ts = "2021-05-09 10:12:26.000"
|
||||
sql flush database $db
|
||||
|
||||
sql insert into tb1 values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
|
||||
sql insert into tb1 values ("2021-05-10 10:10:11", 4, 5.0, NULL, -2000)
|
||||
sql insert into tb1 values ("2021-05-12 10:10:12", 6,NULL, NULL, -3000)
|
||||
|
||||
sql insert into tb2 values ("2021-05-09 10:11:13",-1,-2.0,'-3', -1001)
|
||||
sql insert into tb2 values ("2021-05-10 10:11:14",-4,-5.0, NULL, -2001)
|
||||
sql insert into tb2 values ("2021-05-11 10:11:15",-6, -7, '-8', -3001)
|
||||
|
||||
sql insert into tb3 values ("2021-05-09 10:12:17", 7, 8.0, '9' , -1002)
|
||||
sql insert into tb3 values ("2021-05-09 10:12:17",10,11.0, NULL, -2002)
|
||||
sql insert into tb3 values ("2021-05-09 10:12:18",12,NULL, NULL, -3002)
|
||||
|
||||
sql insert into tb4 values ("2021-05-09 10:12:19",13,14.0,'15' , -1003)
|
||||
sql insert into tb4 values ("2021-05-10 10:12:20",16,17.0, NULL, -2003)
|
||||
sql insert into tb4 values ("2021-05-11 10:12:21",18,NULL, NULL, -3003)
|
||||
|
||||
sql insert into tb5 values ("2021-05-09 10:12:22",19, 20, '21', -1004)
|
||||
sql insert into tb6 values ("2021-05-11 10:12:23",22, 23, NULL, -2004)
|
||||
sql insert into tb7 values ("2021-05-10 10:12:24",24,NULL, '25', -3004)
|
||||
sql insert into tb8 values ("2021-05-11 10:12:25",26,NULL, '27', -4004)
|
||||
|
||||
sql insert into tba values ("2021-05-10 10:12:27",31, 32, NULL, -2005)
|
||||
sql insert into tbb values ("2021-05-10 10:12:28",33,NULL, '35', -3005)
|
||||
sql insert into tbc values ("2021-05-11 10:12:29",36, 37, NULL, -4005)
|
||||
sql insert into tbd values ("2021-05-11 10:12:29",NULL,NULL,NULL,NULL )
|
||||
|
||||
sql drop table tbf;
|
||||
sql alter table st2 add column c1 int;
|
||||
sql alter table st2 drop column c1;
|
||||
|
||||
run tsim/parser/last_cache_query.sim
|
||||
|
||||
sql flush database $db
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
run tsim/parser/last_cache_query.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
sql drop database if exists $db
|
||||
sql create database $db minrows 10 stt_trigger 1
|
||||
sql use $db
|
||||
|
||||
sql create stable st2 (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) tags (id int)
|
||||
sql create table tb1 using st2 tags (1);
|
||||
sql create table tb2 using st2 tags (2);
|
||||
sql create table tb3 using st2 tags (3);
|
||||
sql create table tb4 using st2 tags (4);
|
||||
sql create table tb5 using st2 tags (1);
|
||||
sql create table tb6 using st2 tags (2);
|
||||
sql create table tb7 using st2 tags (3);
|
||||
sql create table tb8 using st2 tags (4);
|
||||
sql create table tb9 using st2 tags (5);
|
||||
sql create table tba using st2 tags (5);
|
||||
sql create table tbb using st2 tags (5);
|
||||
sql create table tbc using st2 tags (5);
|
||||
sql create table tbd using st2 tags (5);
|
||||
sql create table tbe using st2 tags (5);
|
||||
sql create table tbf using st2 tags (5);
|
||||
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.000",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.001",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.002",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.003",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.004",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.005",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.006",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.007",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.008",28, 29, '30', -1005)
|
||||
sql insert into tb9 values ("2021-05-09 10:12:26.009",28, 29, '30', -1005)
|
||||
sql delete from tb9 where ts = "2021-05-09 10:12:26.000"
|
||||
sql flush database $db
|
||||
|
||||
sql insert into tb1 values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
|
||||
sql insert into tb1 values ("2021-05-10 10:10:11", 4, 5.0, NULL, -2000)
|
||||
sql insert into tb1 values ("2021-05-12 10:10:12", 6,NULL, NULL, -3000)
|
||||
|
||||
sql insert into tb2 values ("2021-05-09 10:11:13",-1,-2.0,'-3', -1001)
|
||||
sql insert into tb2 values ("2021-05-10 10:11:14",-4,-5.0, NULL, -2001)
|
||||
sql insert into tb2 values ("2021-05-11 10:11:15",-6, -7, '-8', -3001)
|
||||
|
||||
sql insert into tb3 values ("2021-05-09 10:12:17", 7, 8.0, '9' , -1002)
|
||||
sql insert into tb3 values ("2021-05-09 10:12:17",10,11.0, NULL, -2002)
|
||||
sql insert into tb3 values ("2021-05-09 10:12:18",12,NULL, NULL, -3002)
|
||||
|
||||
sql insert into tb4 values ("2021-05-09 10:12:19",13,14.0,'15' , -1003)
|
||||
sql insert into tb4 values ("2021-05-10 10:12:20",16,17.0, NULL, -2003)
|
||||
sql insert into tb4 values ("2021-05-11 10:12:21",18,NULL, NULL, -3003)
|
||||
|
||||
sql insert into tb5 values ("2021-05-09 10:12:22",19, 20, '21', -1004)
|
||||
sql insert into tb6 values ("2021-05-11 10:12:23",22, 23, NULL, -2004)
|
||||
sql insert into tb7 values ("2021-05-10 10:12:24",24,NULL, '25', -3004)
|
||||
sql insert into tb8 values ("2021-05-11 10:12:25",26,NULL, '27', -4004)
|
||||
|
||||
sql insert into tba values ("2021-05-10 10:12:27",31, 32, NULL, -2005)
|
||||
sql insert into tbb values ("2021-05-10 10:12:28",33,NULL, '35', -3005)
|
||||
sql insert into tbc values ("2021-05-11 10:12:29",36, 37, NULL, -4005)
|
||||
sql insert into tbd values ("2021-05-11 10:12:29",NULL,NULL,NULL,NULL )
|
||||
|
||||
sql drop table tbf
|
||||
sql alter database $db cachemodel 'both'
|
||||
sql alter database $db cachesize 2
|
||||
sleep 11000
|
||||
|
||||
run tsim/parser/last_cache_query.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -357,6 +357,112 @@ if $data45 != 5 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select last_row(*), id from st2 group by id order by id
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
|
||||
print ===> $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
|
||||
print ===> $data30 $data31 $data32 $data33 $data34 $data35 $data36 $data37 $data38 $data39
|
||||
print ===> $data40 $data41 $data42 $data43 $data44 $data45 $data46 $data47 $data48 $data49
|
||||
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @21-05-12 10:10:12.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 6 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != NULL then
|
||||
print $data02
|
||||
return -1
|
||||
endi
|
||||
if $data03 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != @70-01-01 07:59:57.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @21-05-11 10:12:23.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 22 then
|
||||
return -1
|
||||
endi
|
||||
if $data12 != 23.000000000 then
|
||||
print $data02
|
||||
return -1
|
||||
endi
|
||||
if $data13 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data14 != @70-01-01 07:59:58.-04@ then
|
||||
return -1
|
||||
endi
|
||||
if $data15 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @21-05-10 10:12:24.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 24 then
|
||||
return -1
|
||||
endi
|
||||
if $data22 != NULL then
|
||||
print expect NULL actual: $data22
|
||||
return -1
|
||||
endi
|
||||
if $data23 != 25 then
|
||||
return -1
|
||||
endi
|
||||
if $data24 != @70-01-01 07:59:57.-04@ then =
|
||||
return -1
|
||||
endi
|
||||
if $data25 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @21-05-11 10:12:25.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 26 then
|
||||
return -1
|
||||
endi
|
||||
if $data32 != NULL then
|
||||
print $data02
|
||||
return -1
|
||||
endi
|
||||
if $data33 != 27 then
|
||||
return -1
|
||||
endi
|
||||
if $data34 != @70-01-01 07:59:56.-04@ then
|
||||
return -1
|
||||
endi
|
||||
if $data35 != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data40 != @21-05-11 10:12:29.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data41 != 36 then
|
||||
return -1
|
||||
endi
|
||||
if $data42 != 37.000000000 then
|
||||
print $data02
|
||||
return -1
|
||||
endi
|
||||
if $data43 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data44 != @70-01-01 07:59:56.-05@ then
|
||||
return -1
|
||||
endi
|
||||
if $data45 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print "test tbn"
|
||||
sql create table if not exists tbn (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp)
|
||||
sql insert into tbn values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
|
||||
|
@ -386,3 +492,5 @@ if $data04 != @70-01-01 07:59:57.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table tbn add column c1 int;
|
||||
sql alter table tbn drop column c1;
|
||||
|
|
|
@ -9,8 +9,6 @@ print =============== create database
|
|||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql alter local 'disableCount' '0' ;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
||||
sql insert into t1 values(1648791213000,0,1,1,1.0);
|
||||
|
|
|
@ -9,8 +9,6 @@ print =============== create database
|
|||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
sql alter local 'disableCount' '0' ;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
|
|
@ -9,8 +9,6 @@ print =============== create database
|
|||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql alter local 'disableCount' '0' ;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
||||
sql insert into t1 values(1648791213000,0,1,1,1.0);
|
||||
|
|
|
@ -122,135 +122,7 @@ class TDTestCase:
|
|||
tdLog.debug(f"redistributeSql:{redistributeSql}")
|
||||
tdSql.query(redistributeSql)
|
||||
tdLog.debug("redistributeSql ok")
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 60,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
topicNameList = ['topic1']
|
||||
# expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("create ctb1")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
#restart dnode & remove wal
|
||||
self.restartAndRemoveWal()
|
||||
|
||||
# redistribute vgroup
|
||||
self.redistributeVgroups();
|
||||
|
||||
tdLog.info("create ctb2")
|
||||
paraDict['ctbPrefix'] = "ctbn"
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
pInsertThread.join()
|
||||
pInsertThread1.join()
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectrowcnt / 2 > resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName':'dbt'}
|
||||
|
||||
ntbName = "ntb"
|
||||
|
||||
topicNameList = ['topic2']
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName)
|
||||
tdLog.info("create nomal table sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
tdLog.info("create topics from nomal table")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName)
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
#restart dnode & remove wal
|
||||
self.restartAndRemoveWal()
|
||||
|
||||
# redistribute vgroup
|
||||
self.redistributeVgroups();
|
||||
|
||||
sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName)
|
||||
tdLog.info("alter table sql: %s"%sqlString)
|
||||
tdSql.error(sqlString)
|
||||
expectRows = 0
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
time.sleep(1)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
|
||||
def tmqCase3(self):
|
||||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
|
@ -330,12 +202,90 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
def tmqCaseDbname(self):
|
||||
tdLog.printNoPrefix("======== test case 4 subscrib Dbname start: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stbn',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 10,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
topicNameList = ['topic4']
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("create topics from database ")
|
||||
queryString = "database %s "%(paraDict['dbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
time.sleep(1)
|
||||
# restart dnode & remove wal
|
||||
self.restartAndRemoveWal()
|
||||
|
||||
# redistribute vgroup
|
||||
self.redistributeVgroups()
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
time.sleep(6)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 4 subscrib Dbname end ...... ")
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase3()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCaseDbname()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
|
@ -0,0 +1,266 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
from util.cluster import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 1
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 1000
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def getDataPath(self):
|
||||
selfPath = tdCom.getBuildPath()
|
||||
|
||||
return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*';
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 60,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdCom.drop_all_db()
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
# tdLog.info("create ctb")
|
||||
# tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
# ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
# tdLog.info("insert data")
|
||||
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
def restartAndRemoveWal(self):
|
||||
tdDnodes = cluster.dnodes
|
||||
tdSql.query("select * from information_schema.ins_vnodes")
|
||||
for result in tdSql.queryResult:
|
||||
if result[2] == 'dbt':
|
||||
tdLog.debug("dnode is %d"%(result[0]))
|
||||
dnodeId = result[0]
|
||||
vnodeId = result[1]
|
||||
|
||||
tdDnodes[dnodeId - 1].stoptaosd()
|
||||
time.sleep(1)
|
||||
dataPath = self.getDataPath()
|
||||
dataPath = dataPath%(dnodeId,vnodeId)
|
||||
os.system('rm -rf ' + dataPath)
|
||||
tdLog.debug("dataPath:%s"%dataPath)
|
||||
tdDnodes[dnodeId - 1].starttaosd()
|
||||
time.sleep(1)
|
||||
break
|
||||
tdLog.debug("restart dnode ok")
|
||||
|
||||
def redistributeVgroups(self):
|
||||
dnodesList = []
|
||||
tdSql.query("show dnodes")
|
||||
for result in tdSql.queryResult:
|
||||
dnodesList.append(result[0])
|
||||
print("dnodeList:",dnodesList)
|
||||
tdSql.query("select * from information_schema.ins_vnodes")
|
||||
vnodeId = 0
|
||||
for result in tdSql.queryResult:
|
||||
if result[2] == 'dbt':
|
||||
tdLog.debug("dnode is %d"%(result[0]))
|
||||
dnodesList.remove(result[0])
|
||||
vnodeId = result[1]
|
||||
print("its all data",dnodesList)
|
||||
# if self.replicaVar == 1:
|
||||
# redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0])
|
||||
# else:
|
||||
redistributeSql = f"redistribute vgroup {vnodeId} "
|
||||
for vgdnode in dnodesList:
|
||||
redistributeSql += f"dnode {vgdnode} "
|
||||
print(redistributeSql)
|
||||
|
||||
tdLog.debug(f"redistributeSql:{redistributeSql}")
|
||||
tdSql.query(redistributeSql)
|
||||
tdLog.debug("redistributeSql ok")
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 60,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
topicNameList = ['topic1']
|
||||
# expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
tdLog.info("create ctb1")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
#restart dnode & remove wal
|
||||
self.restartAndRemoveWal()
|
||||
|
||||
# redistribute vgroup
|
||||
self.redistributeVgroups();
|
||||
|
||||
tdLog.info("create ctb2")
|
||||
paraDict['ctbPrefix'] = "ctbn"
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||
pInsertThread.join()
|
||||
pInsertThread1.join()
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectrowcnt / 2 > resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName':'dbt'}
|
||||
|
||||
ntbName = "ntb"
|
||||
|
||||
topicNameList = ['topic2']
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName)
|
||||
tdLog.info("create nomal table sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
tdLog.info("create topics from nomal table")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName)
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
#restart dnode & remove wal
|
||||
self.restartAndRemoveWal()
|
||||
|
||||
# redistribute vgroup
|
||||
self.redistributeVgroups();
|
||||
|
||||
sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName)
|
||||
tdLog.info("alter table sql: %s"%sqlString)
|
||||
tdSql.error(sqlString)
|
||||
expectRows = 0
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
time.sleep(1)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -75,7 +75,7 @@ bool shellIsEmptyCommand(const char *cmd) {
|
|||
|
||||
int32_t shellRunSingleCommand(char *command) {
|
||||
shellCmdkilled = false;
|
||||
|
||||
|
||||
if (shellIsEmptyCommand(command)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -1019,7 +1019,7 @@ void shellReadHistory() {
|
|||
|
||||
char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1);
|
||||
int32_t read_size = 0;
|
||||
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) {
|
||||
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) > 0) {
|
||||
line[read_size - 1] = '\0';
|
||||
taosMemoryFree(pHistory->hist[pHistory->hend]);
|
||||
pHistory->hist[pHistory->hend] = taosStrdup(line);
|
||||
|
@ -1315,7 +1315,7 @@ int32_t shellExecute() {
|
|||
shellSetConn(shell.conn, runOnce);
|
||||
shellReadHistory();
|
||||
|
||||
if(shell.args.is_bi_mode) {
|
||||
if(shell.args.is_bi_mode) {
|
||||
// need set bi mode
|
||||
printf("Set BI mode is true.\n");
|
||||
#ifndef WEBSOCKET
|
||||
|
|
Loading…
Reference in New Issue