Merge branch '3.0' of https://github.com/taosdata/TDengine into test/3.0/TD-28844-1

This commit is contained in:
chenhaoran 2024-03-04 09:03:09 +08:00
commit 232bd60a23
39 changed files with 643 additions and 280 deletions

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.2.3.0.alpha") SET(TD_VER_NUMBER "3.2.4.0.alpha")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)

View File

@ -842,12 +842,12 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
In addition to native connections, the client library also supports subscriptions via websockets. In addition to native connections, the client library also supports subscriptions via websockets.
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/#create-a-consumer). The syntax for creating a consumer is "consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/#create-a-consumer).
```python ```python
import taosws import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
``` ```
</TabItem> </TabItem>
@ -887,13 +887,13 @@ The `poll` function is used to consume data in tmq. The parameter of the `poll`
```python ```python
while True: while True:
res = consumer.poll(1) message = consumer.poll(1)
if not res: if not message:
continue continue
err = res.error() err = message.error()
if err is not None: if err is not None:
raise err raise err
val = res.value() val = message.value()
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
@ -902,16 +902,14 @@ while True:
</TabItem> </TabItem>
<TabItem value="websocket" label="WebSocket connection"> <TabItem value="websocket" label="WebSocket connection">
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out.
```python ```python
while True: while True:
res = consumer.poll(timeout=1.0) message = consumer.poll(1)
if not res: if not message:
continue continue
err = res.error()
if err is not None:
raise err
for block in message: for block in message:
for row in block: for row in block:
print(row) print(row)

View File

@ -41,16 +41,28 @@ window_clause: {
SESSION(ts_col, tol_val) SESSION(ts_col, tol_val)
| STATE_WINDOW(col) | STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| COUNT_WINDOW(count_val[, sliding_val])
} }
``` ```
`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically. `SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.
`EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` can be any conditional expressions supported by TDengine and can include multiple columns.
`COUNT_WINDOW` is a counting window that is divided by a fixed number of data rows.`count_val`: A constant, which is a positive integer and must be greater than or equal to 2. The maximum value is 2147483648. `count_val` represents the maximum number of data rows contained in each `COUNT_WINDOW`. When the total number of data rows cannot be divided by `count_val`, the number of rows in the last window will be less than `count_val`. `sliding_val`: is a constant that represents the number of window slides, similar to `SLIDING` in `INTERVAL`.
For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable. For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable.
```sql ```sql
CREATE STREAM avg_vol_s INTO avg_vol AS CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
CREATE STREAM streams0 INTO streamt0 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9;
CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10);
``` ```
## Partitions of Stream ## Partitions of Stream

View File

@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.2.3.0
<Release type="tdengine" version="3.2.3.0" />
## 3.2.2.0 ## 3.2.2.0
<Release type="tdengine" version="3.2.2.0" /> <Release type="tdengine" version="3.2.2.0" />

View File

@ -856,7 +856,7 @@ taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创
```python ```python
import taosws import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
``` ```
</TabItem> </TabItem>
@ -896,13 +896,13 @@ Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 flo
```python ```python
while True: while True:
res = consumer.poll(1) message = consumer.poll(1)
if not res: if not message:
continue continue
err = res.error() err = message.error()
if err is not None: if err is not None:
raise err raise err
val = res.value() val = message.value()
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
@ -911,16 +911,14 @@ while True:
</TabItem> </TabItem>
<TabItem value="websocket" label="WebSocket 连接"> <TabItem value="websocket" label="WebSocket 连接">
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间超时时间单位为秒s`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间超时时间单位为秒s`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。
```python ```python
while True: while True:
res = consumer.poll(timeout=1.0) message = consumer.poll(1)
if not res: if not message:
continue continue
err = res.error()
if err is not None:
raise err
for block in message: for block in message:
for row in block: for row in block:
print(row) print(row)

View File

@ -49,10 +49,14 @@ window_clause: {
SESSION(ts_col, tol_val) SESSION(ts_col, tol_val)
| STATE_WINDOW(col) | STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| COUNT_WINDOW(count_val[, sliding_val])
} }
``` ```
其中SESSION 是会话窗口tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val则自动开启下一个窗口。 其中SESSION 是会话窗口tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val则自动开启下一个窗口。
EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。
COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_val 是常量是正整数必须大于等于2小于2147483648。 count_val 表示每个 COUNT_WINDOW 包含的最大数据行数,总数据行数不能整除 count_val 时,最后一个窗口的行数会小于 count_val 。 sliding_val 是常量,表示窗口滑动的数量,类似于 INTERVAL 的 SLIDING 。
窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
@ -61,6 +65,12 @@ window_clause: {
```sql ```sql
CREATE STREAM avg_vol_s INTO avg_vol AS CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
CREATE STREAM streams0 INTO streamt0 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9;
CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10);
``` ```
## 流式计算的 partition ## 流式计算的 partition

View File

@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.2.3.0
<Release type="tdengine" version="3.2.3.0" />
## 3.2.2.0 ## 3.2.2.0
<Release type="tdengine" version="3.2.2.0" /> <Release type="tdengine" version="3.2.2.0" />

View File

@ -219,7 +219,6 @@ extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache; extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold; extern int32_t tsPQSortMemThreshold;
extern int32_t tsResolveFQDNRetryTime; extern int32_t tsResolveFQDNRetryTime;
extern bool tsDisableCount;
extern bool tsExperimental; extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -581,8 +581,8 @@ typedef struct {
}; };
} SSubmitRsp; } SSubmitRsp;
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); // int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); // int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
// void tFreeSSubmitBlkRsp(void* param); // void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitRsp(SSubmitRsp* pRsp); void tFreeSSubmitRsp(SSubmitRsp* pRsp);
@ -885,8 +885,8 @@ typedef struct {
int64_t maxStorage; int64_t maxStorage;
} SCreateAcctReq, SAlterAcctReq; } SCreateAcctReq, SAlterAcctReq;
int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq); // int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq); // int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
@ -3446,7 +3446,7 @@ int32_t tDeserializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexR
typedef SMDropSmaReq SDropTagIndexReq; typedef SMDropSmaReq SDropTagIndexReq;
int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq); // int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
int32_t tDeserializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq); int32_t tDeserializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
typedef struct { typedef struct {
@ -3567,8 +3567,8 @@ typedef struct {
int8_t igNotExists; int8_t igNotExists;
} SMDropFullTextReq; } SMDropFullTextReq;
int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq); // int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq); // int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
typedef struct { typedef struct {
char indexFName[TSDB_INDEX_FNAME_LEN]; char indexFName[TSDB_INDEX_FNAME_LEN];

View File

@ -433,7 +433,7 @@ int32_t* taosGetErrno();
//mnode-compact //mnode-compact
#define TSDB_CODE_MND_INVALID_COMPACT_ID TAOS_DEF_ERROR_CODE(0, 0x04B1) #define TSDB_CODE_MND_INVALID_COMPACT_ID TAOS_DEF_ERROR_CODE(0, 0x04B1)
#define TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x04B2)
// vnode // vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x

View File

@ -187,6 +187,8 @@ typedef enum ELogicConditionType {
LOGIC_COND_TYPE_NOT, LOGIC_COND_TYPE_NOT,
} ELogicConditionType; } ELogicConditionType;
#define TSDB_INT32_ID_LEN 11
#define TSDB_NAME_DELIMITER_LEN 1 #define TSDB_NAME_DELIMITER_LEN 1
#define TSDB_UNI_LEN 24 #define TSDB_UNI_LEN 24

View File

@ -26,6 +26,8 @@ typedef struct SScalableBf {
SArray *bfArray; // array of bloom filters SArray *bfArray; // array of bloom filters
uint32_t growth; uint32_t growth;
uint64_t numBits; uint64_t numBits;
uint32_t maxBloomFilters;
int8_t status;
_hash_fn_t hashFn1; _hash_fn_t hashFn1;
_hash_fn_t hashFn2; _hash_fn_t hashFn2;
} SScalableBf; } SScalableBf;

View File

@ -354,6 +354,7 @@ SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid); int32_t releaseRequest(int64_t rid);
int32_t removeRequest(int64_t rid); int32_t removeRequest(int64_t rid);
void doDestroyRequest(void* p); void doDestroyRequest(void* p);
int64_t removeFromMostPrevReq(SRequestObj* pRequest);
char* getDbOfConnection(STscObj* pObj); char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db); void setConnectionDB(STscObj* pTscObj, const char* db);

View File

@ -385,6 +385,33 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
/// return the most previous req ref id
int64_t removeFromMostPrevReq(SRequestObj* pRequest) {
int64_t mostPrevReqRefId = pRequest->self;
SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
pTmp = acquireRequest(pTmp->relation.prevRefId);
if (pTmp) {
mostPrevReqRefId = pTmp->self;
releaseRequest(mostPrevReqRefId);
} else {
break;
}
}
removeRequest(mostPrevReqRefId);
return mostPrevReqRefId;
}
void destroyNextReq(int64_t nextRefId) {
if (nextRefId) {
SRequestObj* pObj = acquireRequest(nextRefId);
if (pObj) {
releaseRequest(nextRefId);
releaseRequest(nextRefId);
}
}
}
void destroySubRequests(SRequestObj *pRequest) { void destroySubRequests(SRequestObj *pRequest) {
int32_t reqIdx = -1; int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL}; SRequestObj *pReqList[16] = {NULL};
@ -435,7 +462,7 @@ void doDestroyRequest(void *p) {
uint64_t reqId = pRequest->requestId; uint64_t reqId = pRequest->requestId;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroySubRequests(pRequest); int64_t nextReqRefId = pRequest->relation.nextRefId;
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
@ -471,6 +498,7 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest); taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroyNextReq(nextReqRefId);
} }
void destroyRequest(SRequestObj *pRequest) { void destroyRequest(SRequestObj *pRequest) {
@ -479,7 +507,7 @@ void destroyRequest(SRequestObj *pRequest) {
} }
taos_stop_query(pRequest); taos_stop_query(pRequest);
removeRequest(pRequest->self); removeFromMostPrevReq(pRequest);
} }
void taosStopQueryImpl(SRequestObj *pRequest) { void taosStopQueryImpl(SRequestObj *pRequest) {

View File

@ -1254,54 +1254,34 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
int32_t reqIdx = 0; tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
SRequestObj *pReqList[16] = {NULL}; SRequestObj* pUserReq = pRequest;
SRequestObj *pUserReq = NULL; acquireRequest(pRequest->self);
pReqList[0] = pRequest; while (pUserReq) {
uint64_t tmpRefId = 0; if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
SRequestObj *pTmp = pRequest;
while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break; break;
}
}
tmpRefId = pRequest->relation.nextRefId;
while (tmpRefId) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else { } else {
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); int64_t nextRefId = pUserReq->relation.nextRefId;
break; releaseRequest(pUserReq->self);
if (nextRefId) {
pUserReq = acquireRequest(nextRefId);
}
} }
} }
bool hasSubRequest = pUserReq != pRequest || pRequest->relation.prevRefId != 0;
for (int32_t i = reqIdx; i >= 0; i--) {
destroyCtxInRequest(pReqList[i]);
if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) {
pUserReq = pReqList[i];
} else {
removeRequest(pReqList[i]->self);
}
}
if (pUserReq) { if (pUserReq) {
destroyCtxInRequest(pUserReq);
pUserReq->prevCode = code; pUserReq->prevCode = code;
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation)); memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
} else { } else {
tscError("user req is missing"); tscError("User req is missing");
removeFromMostPrevReq(pRequest);
return; return;
} }
if (hasSubRequest)
removeFromMostPrevReq(pRequest);
else
releaseRequest(pUserReq->self);
doAsyncQuery(pUserReq, true); doAsyncQuery(pUserReq, true);
} }

View File

@ -452,20 +452,21 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
} }
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
int32_t newLen = pSource->varmeta.length;
memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows); memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) { if (pColumnInfoData->varmeta.allocLen < newLen) {
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pColumnInfoData->pData = tmp; pColumnInfoData->pData = tmp;
pColumnInfoData->varmeta.allocLen = pSource->varmeta.length; pColumnInfoData->varmeta.allocLen = newLen;
} }
pColumnInfoData->varmeta.length = pSource->varmeta.length; pColumnInfoData->varmeta.length = newLen;
if (pColumnInfoData->pData != NULL && pSource->pData != NULL) { if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length); memcpy(pColumnInfoData->pData, pSource->pData, newLen);
} }
} else { } else {
memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows)); memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
@ -1687,7 +1688,29 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
} }
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (n >= total || n == 0) return;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if (pColInfoData->varmeta.length != 0) {
int32_t newLen = pColInfoData->varmeta.offset[n];
if (-1 == newLen) {
for (int i = n - 1; i >= 0; --i) {
newLen = pColInfoData->varmeta.offset[i];
if (newLen != -1) {
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
newLen += getJsonValueLen(pColInfoData->pData + newLen);
} else {
newLen += varDataTLen(pColInfoData->pData + newLen);
}
break;
}
}
}
if (newLen <= -1) {
uFatal("colDataKeepFirstNRows: newLen:%d old:%d", newLen, pColInfoData->varmeta.length);
} else {
pColInfoData->varmeta.length = newLen;
}
}
// pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n); // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
memset(&pColInfoData->varmeta.offset[n], 0, total - n); memset(&pColInfoData->varmeta.offset[n], 0, total - n);
} }

View File

@ -269,7 +269,6 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024;
bool tsFilterScalarMode = false; bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 1000; int tsStreamAggCnt = 1000;
bool tsDisableCount = true;
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>"; char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>"; char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
@ -541,8 +540,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "disableCount", tsDisableCount, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
return 0; return 0;
} }
@ -1109,8 +1106,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
tsExperimental = cfgGetItem(pCfg, "experimental")->bval; tsExperimental = cfgGetItem(pCfg, "experimental")->bval;
tsDisableCount = cfgGetItem(pCfg, "disableCount")->bval;
return 0; return 0;
} }
@ -1739,8 +1734,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"shellActivityTimer", &tsShellActivityTimer}, {"shellActivityTimer", &tsShellActivityTimer},
{"slowLogThreshold", &tsSlowLogThreshold}, {"slowLogThreshold", &tsSlowLogThreshold},
{"useAdapter", &tsUseAdapter}, {"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental}, {"experimental", &tsExperimental}};
{"disableCount", &tsDisableCount}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -1009,19 +1009,19 @@ int32_t tDeserializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexR
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) { // int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SEncoder encoder = {0}; // SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); // tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; // if (tStartEncode(&encoder) < 0) return -1;
tEndEncode(&encoder); // tEndEncode(&encoder);
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; // if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; // if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
int32_t tlen = encoder.pos; // int32_t tlen = encoder.pos;
tEncoderClear(&encoder); // tEncoderClear(&encoder);
return tlen; // return tlen;
} // }
int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) { int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
@ -1035,6 +1035,7 @@ int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *
return 0; return 0;
} }
int32_t tSerializeSMCreateFullTextReq(void *buf, int32_t bufLen, SMCreateFullTextReq *pReq) { int32_t tSerializeSMCreateFullTextReq(void *buf, int32_t bufLen, SMCreateFullTextReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -1059,32 +1060,32 @@ void tFreeSMCreateFullTextReq(SMCreateFullTextReq *pReq) {
// impl later // impl later
return; return;
} }
int32_t tSerializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) { // int32_t tSerializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
SEncoder encoder = {0}; // SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); // tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; // if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; // if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; // if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder); // tEndEncode(&encoder);
int32_t tlen = encoder.pos; // int32_t tlen = encoder.pos;
tEncoderClear(&encoder); // tEncoderClear(&encoder);
return tlen; // return tlen;
} // }
int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) { // int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextReq *pReq) {
SDecoder decoder = {0}; // SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); // tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; // if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; // if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; // if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder); // tEndDecode(&decoder);
tDecoderClear(&decoder); // tDecoderClear(&decoder);
return 0; // return 0;
} // }
int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
@ -1474,44 +1475,44 @@ void tFreeSStatisReq(SStatisReq *pReq) {
taosMemoryFreeClear(pReq->pCont); taosMemoryFreeClear(pReq->pCont);
} }
int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { // int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
SEncoder encoder = {0}; // SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); // tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; // if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1; // if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1; // if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1;
if (tEncodeI32(&encoder, pReq->maxUsers) < 0) return -1; // if (tEncodeI32(&encoder, pReq->maxUsers) < 0) return -1;
if (tEncodeI32(&encoder, pReq->maxDbs) < 0) return -1; // if (tEncodeI32(&encoder, pReq->maxDbs) < 0) return -1;
if (tEncodeI32(&encoder, pReq->maxTimeSeries) < 0) return -1; // if (tEncodeI32(&encoder, pReq->maxTimeSeries) < 0) return -1;
if (tEncodeI32(&encoder, pReq->maxStreams) < 0) return -1; // if (tEncodeI32(&encoder, pReq->maxStreams) < 0) return -1;
if (tEncodeI32(&encoder, pReq->accessState) < 0) return -1; // if (tEncodeI32(&encoder, pReq->accessState) < 0) return -1;
if (tEncodeI64(&encoder, pReq->maxStorage) < 0) return -1; // if (tEncodeI64(&encoder, pReq->maxStorage) < 0) return -1;
tEndEncode(&encoder); // tEndEncode(&encoder);
int32_t tlen = encoder.pos; // int32_t tlen = encoder.pos;
tEncoderClear(&encoder); // tEncoderClear(&encoder);
return tlen; // return tlen;
} // }
int32_t tDeserializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { // int32_t tDeserializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
SDecoder decoder = {0}; // SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); // tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; // if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1; // if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1; // if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->maxUsers) < 0) return -1; // if (tDecodeI32(&decoder, &pReq->maxUsers) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->maxDbs) < 0) return -1; // if (tDecodeI32(&decoder, &pReq->maxDbs) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->maxTimeSeries) < 0) return -1; // if (tDecodeI32(&decoder, &pReq->maxTimeSeries) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->maxStreams) < 0) return -1; // if (tDecodeI32(&decoder, &pReq->maxStreams) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->accessState) < 0) return -1; // if (tDecodeI32(&decoder, &pReq->accessState) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->maxStorage) < 0) return -1; // if (tDecodeI64(&decoder, &pReq->maxStorage) < 0) return -1;
tEndDecode(&decoder); // tEndDecode(&decoder);
tDecoderClear(&decoder); // tDecoderClear(&decoder);
return 0; // return 0;
} // }
int32_t tSerializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq) { int32_t tSerializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
@ -5238,11 +5239,11 @@ int32_t tDeserializeSQueryCompactProgressRsp(void *buf, int32_t bufLen, SQueryCo
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -2;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -3;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -4;
if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -5;
if (tDecodeI32(&decoder, &pReq->finished) < 0) return -1; if (tDecodeI32(&decoder, &pReq->finished) < 0) return -6;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -7934,64 +7935,64 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
return 0; return 0;
} }
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { // static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tStartDecode(pDecoder) < 0) return -1; // if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1; // if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; // if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); // pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
if (NULL == pBlock->tblFName) return -1; // if (NULL == pBlock->tblFName) return -1;
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; // if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; // if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; // if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; // if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
int32_t meta = 0; // int32_t meta = 0;
if (tDecodeI32(pDecoder, &meta) < 0) return -1; // if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) { // if (meta) {
pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); // pBlock->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pBlock->pMeta) return -1; // if (NULL == pBlock->pMeta) return -1;
if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1; // if (tDecodeSTableMetaRsp(pDecoder, pBlock->pMeta) < 0) return -1;
} else { // } else {
pBlock->pMeta = NULL; // pBlock->pMeta = NULL;
} // }
tEndDecode(pDecoder); // tEndDecode(pDecoder);
return 0; // return 0;
} // }
int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) { // int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) {
int32_t nBlocks = taosArrayGetSize(pRsp->pArray); // int32_t nBlocks = taosArrayGetSize(pRsp->pArray);
if (tStartEncode(pEncoder) < 0) return -1; // if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1; // if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1; // if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1;
if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1; // if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1;
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) { // for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1; // if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1;
} // }
tEndEncode(pEncoder); // tEndEncode(pEncoder);
return 0; // return 0;
} // }
int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { // int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
if (tStartDecode(pDecoder) < 0) return -1; // if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1; // if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1; // if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1; // if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks)); // pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks));
if (pRsp->pBlocks == NULL) return -1; // if (pRsp->pBlocks == NULL) return -1;
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) { // for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; // if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
} // }
tEndDecode(pDecoder); // tEndDecode(pDecoder);
tDecoderClear(pDecoder); // tDecoderClear(pDecoder);
return 0; // return 0;
} // }
// void tFreeSSubmitBlkRsp(void *param) { // void tFreeSSubmitBlkRsp(void *param) {
// if (NULL == param) { // if (NULL == param) {

View File

@ -345,6 +345,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp; rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle;
rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryStepFactor = tsRedirectFactor;

View File

@ -454,7 +454,7 @@ int32_t mndProcessKillCompactReq(SRpcMsg *pReq){
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[MND_COMPACT_ID_LEN] = {0}; char obj[TSDB_INT32_ID_LEN] = {0};
sprintf(obj, "%d", pCompact->compactId); sprintf(obj, "%d", pCompact->compactId);
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
@ -490,13 +490,17 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
} }
return -1; return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
} }
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
SQueryCompactProgressRsp req = {0}; SQueryCompactProgressRsp req = {0};
if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) { int32_t code = 0;
code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
if (code != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d",
code, pReq->pCont, pReq->contLen);
return -1; return -1;
} }
@ -504,10 +508,10 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1;
if(mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req) != 0){ code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
if(code != 0){
terrno = code;
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished);
return -1; return -1;

View File

@ -709,7 +709,8 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
int32_t code = syncProcessMsg(pMgmt->sync, pMsg); int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
if (code != 0) { if (code != 0) {
mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); mGError("vgId:1, failed to process sync msg:%p type:%s, errno: %s, code:0x%x", pMsg, TMSG_INFO(pMsg->msgType),
terrstr(), code);
} }
return code; return code;

View File

@ -789,25 +789,6 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_
return code; return code;
} }
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
SLastCol *pLastCol = NULL;
char *err = NULL;
size_t vlen = 0;
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
size_t klen = ROCKS_KEY_LEN;
char *value = NULL;
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
pLastCol = tsdbCacheDeserialize(value);
return pLastCol;
}
typedef struct { typedef struct {
int idx; int idx;
SLastKey key; SLastKey key;
@ -1052,6 +1033,25 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds); int nCols, int16_t *slotIds);
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
SLastCol *pLastCol = NULL;
char *err = NULL;
size_t vlen = 0;
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
size_t klen = ROCKS_KEY_LEN;
char *value = NULL;
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
pLastCol = tsdbCacheDeserialize(value);
return pLastCol;
}
int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
rocksdb_writebatch_t *wb = NULL; rocksdb_writebatch_t *wb = NULL;
int32_t code = 0; int32_t code = 0;
@ -1233,10 +1233,10 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
SArray* lastTmpColArray = NULL; SArray *lastTmpColArray = NULL;
SArray* lastTmpIndexArray = NULL; SArray *lastTmpIndexArray = NULL;
SArray* lastrowTmpColArray = NULL; SArray *lastrowTmpColArray = NULL;
SArray* lastrowTmpIndexArray = NULL; SArray *lastrowTmpIndexArray = NULL;
int lastIndex = 0; int lastIndex = 0;
int lastrowIndex = 0; int lastrowIndex = 0;
@ -1245,7 +1245,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
SIdxKey *idxKey = taosArrayGet(remainCols, i); SIdxKey *idxKey = taosArrayGet(remainCols, i);
slotIds[i] = pr->pSlotIds[idxKey->idx]; slotIds[i] = pr->pSlotIds[idxKey->idx];
if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) { if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) {
if(NULL == lastTmpIndexArray) { if (NULL == lastTmpIndexArray) {
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
} }
taosArrayPush(lastTmpIndexArray, &(i)); taosArrayPush(lastTmpIndexArray, &(i));
@ -1253,7 +1253,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx]; lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
lastIndex++; lastIndex++;
} else { } else {
if(NULL == lastrowTmpIndexArray) { if (NULL == lastrowTmpIndexArray) {
lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
} }
taosArrayPush(lastrowTmpIndexArray, &(i)); taosArrayPush(lastrowTmpIndexArray, &(i));
@ -1265,17 +1265,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol)); pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
if(lastTmpIndexArray != NULL) { if (lastTmpIndexArray != NULL) {
mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds);
for(int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) { for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i)); taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i));
} }
} }
if(lastrowTmpIndexArray != NULL) { if (lastrowTmpIndexArray != NULL) {
mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i)); taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
taosArrayGet(lastrowTmpColArray, i));
} }
} }

View File

@ -372,8 +372,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = syncProcessMsg(pVnode->sync, pMsg); int32_t code = syncProcessMsg(pVnode->sync, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg, vGError("vgId:%d, failed to process sync msg:%p type:%s, errno: %s, code:0x%x", pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), terrstr()); TMSG_INFO(pMsg->msgType), terrstr(), code);
} }
return code; return code;

View File

@ -94,10 +94,10 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pBlock->info.rows;) { for (int32_t i = 0; i < pBlock->info.rows;) {
int32_t step = pInfo->windowSliding;
SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow); SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow);
int32_t prevRows = pBuffInfo->winRows; int32_t prevRows = pBuffInfo->winRows;
int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows); int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
int32_t step = num;
if (prevRows == 0) { if (prevRows == 0) {
pInfo->pRow->win.skey = tsCols[i]; pInfo->pRow->win.skey = tsCols[i];
} }
@ -118,6 +118,8 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (prevRows <= pInfo->windowSliding) { if (prevRows <= pInfo->windowSliding) {
if (pBuffInfo->winRows > pInfo->windowSliding) { if (pBuffInfo->winRows > pInfo->windowSliding) {
step = pInfo->windowSliding - prevRows; step = pInfo->windowSliding - prevRows;
} else {
step = pInfo->windowSliding;
} }
} else { } else {
step = 0; step = 0;

View File

@ -1016,10 +1016,6 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (!pCxt->pPlanCxt->streamQuery && tsDisableCount) {
return TSDB_CODE_FAILED;
}
pWindow->winType = WINDOW_TYPE_COUNT; pWindow->winType = WINDOW_TYPE_COUNT;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect); pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder = pWindow->node.requireDataOrder =

View File

@ -22,7 +22,7 @@
#define DEFAULT_MAP_CAPACITY 131072 #define DEFAULT_MAP_CAPACITY 131072
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100)
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 100000 #define MAX_NUM_SCALABLE_BF 64
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1 #define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MAX_INTERVAL MILLISECOND_PER_MINUTE
@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= adjInterval) { if (watermark <= adjInterval) {
watermark = TMAX(originInt / adjInterval, 1) * adjInterval; watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { }
if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval; watermark = MAX_NUM_SCALABLE_BF * adjInterval;
} }
return watermark; return watermark;

View File

@ -256,21 +256,21 @@ void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool); bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
do { \ do { \
for (int i = 0; i < pool->nAsync; i++) { \ for (int i = 0; i < pool->nAsync; i++) { \
uv_async_t* async = &(pool->asyncs[i]); \ uv_async_t* async = &(pool->asyncs[i]); \
SAsyncItem* item = async->data; \ SAsyncItem* item = async->data; \
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \ while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
tTrace("destroy msg in async pool "); \ tTrace("destroy msg in async pool "); \
queue* h = QUEUE_HEAD(&item->qmsg); \ queue* h = QUEUE_HEAD(&item->qmsg); \
QUEUE_REMOVE(h); \ QUEUE_REMOVE(h); \
msgType* msg = QUEUE_DATA(h, msgType, q); \ msgType* msg = QUEUE_DATA(h, msgType, q); \
if (msg != NULL) { \ if (msg != NULL) { \
freeFunc(msg); \ freeFunc(msg, param); \
} \ } \
} \ } \
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \ #define ASYNC_CHECK_HANDLE(exh1, id) \

View File

@ -191,6 +191,15 @@ static void httpDestroyMsg(SHttpMsg* msg) {
taosMemoryFree(msg->cont); taosMemoryFree(msg->cont);
taosMemoryFree(msg); taosMemoryFree(msg);
} }
static void httpDestroyMsgWrapper(void* cont, void* param) {
httpDestroyMsg((SHttpMsg*)cont);
// if (msg == NULL) return;
// taosMemoryFree(msg->server);
// taosMemoryFree(msg->uri);
// taosMemoryFree(msg->cont);
// taosMemoryFree(msg);
}
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
SHttpMsg *msg = NULL, *quitMsg = NULL; SHttpMsg *msg = NULL, *quitMsg = NULL;
@ -554,7 +563,7 @@ void transHttpEnvDestroy() {
httpSendQuit(); httpSendQuit();
taosThreadJoin(load->thread, NULL); taosThreadJoin(load->thread, NULL);
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg); TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
transAsyncPoolDestroy(load->asyncPool); transAsyncPoolDestroy(load->asyncPool);
uv_loop_close(load->loop); uv_loop_close(load->loop);
taosMemoryFree(load->loop); taosMemoryFree(load->loop);

View File

@ -219,6 +219,8 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
/// NULL,cliHandleUpdate}; /// NULL,cliHandleUpdate};
static FORCE_INLINE void destroyCmsg(void* cmsg); static FORCE_INLINE void destroyCmsg(void* cmsg);
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param);
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
@ -1963,7 +1965,17 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
transFreeMsg(pMsg->msg.pCont); transFreeMsg(pMsg->msg.pCont);
taosMemoryFree(pMsg); taosMemoryFree(pMsg);
} }
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
SCliMsg* pMsg = arg;
if (pMsg == NULL) {
return;
}
if (param != NULL) {
SCliThrd* pThrd = param;
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
}
destroyCmsg(pMsg);
}
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
if (param == NULL) return; if (param == NULL) return;
@ -2057,7 +2069,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
CLI_RELEASE_UV(pThrd->loop); CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsgWrapper, (void*)pThrd);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);

View File

@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn); static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@ -671,7 +671,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
transFreeMsg(smsg->msg.pCont); transFreeMsg(smsg->msg.pCont);
taosMemoryFree(smsg); taosMemoryFree(smsg);
} }
static void destroyAllConn(SWorkThrd* pThrd) { static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd); tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) { while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn); queue* h = QUEUE_HEAD(&pThrd->conn);
@ -1394,7 +1395,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
} }
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList); uvWhiteListDestroy(pThrd->pWhiteList);

View File

@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_NOT_EXIST, "view not exists in db
//mnode-compact //mnode-compact
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COMPACT_ID, "Invalid compact id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COMPACT_ID, "Invalid compact id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST, "compact detail doesn't exist")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")

View File

@ -20,6 +20,9 @@
#define DEFAULT_GROWTH 2 #define DEFAULT_GROWTH 2
#define DEFAULT_TIGHTENING_RATIO 0.5 #define DEFAULT_TIGHTENING_RATIO 0.5
#define DEFAULT_MAX_BLOOMFILTERS 4
#define SBF_INVALID -1
#define SBF_VALID 0
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate); static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
if (pSBf == NULL) { if (pSBf == NULL) {
return NULL; return NULL;
} }
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
pSBf->status = SBF_VALID;
pSBf->numBits = 0; pSBf->numBits = 0;
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) { if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
} }
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
int32_t size = taosArrayGetSize(pSBf->bfArray); int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf); ASSERT(pNormalBf);
@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) { if (pNormalBf == NULL) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
} }
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray); int32_t size = taosArrayGetSize(pSBf->bfArray);
@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) { if (pNormalBf == NULL) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
} }
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray); int32_t size = taosArrayGetSize(pSBf->bfArray);
@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
} }
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) { static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
return NULL;
}
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
if (pNormalBf == NULL) { if (pNormalBf == NULL) {
return NULL; return NULL;
@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
} }
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1; if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1;
if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1;
return 0; return 0;
} }
@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
} }
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error;
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error;
return pSBf; return pSBf;
_error: _error:

View File

@ -1086,6 +1086,7 @@
,,y,script,./test.sh -f tsim/parser/join_multivnode.sim ,,y,script,./test.sh -f tsim/parser/join_multivnode.sim
,,y,script,./test.sh -f tsim/parser/join.sim ,,y,script,./test.sh -f tsim/parser/join.sim
,,y,script,./test.sh -f tsim/parser/last_cache.sim ,,y,script,./test.sh -f tsim/parser/last_cache.sim
,,y,script,./test.sh -f tsim/parser/last_both.sim
,,y,script,./test.sh -f tsim/parser/last_groupby.sim ,,y,script,./test.sh -f tsim/parser/last_groupby.sim
,,y,script,./test.sh -f tsim/parser/lastrow.sim ,,y,script,./test.sh -f tsim/parser/lastrow.sim
,,y,script,./test.sh -f tsim/parser/lastrow2.sim ,,y,script,./test.sh -f tsim/parser/lastrow2.sim

View File

@ -0,0 +1,150 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======================== dnode1 start
$db = testdb
sql drop database if exists $db
sql create database $db cachemodel 'both' minrows 10 stt_trigger 1
sql use $db
sql create stable st2 (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) tags (id int)
sql create table tb1 using st2 tags (1);
sql create table tb2 using st2 tags (2);
sql create table tb3 using st2 tags (3);
sql create table tb4 using st2 tags (4);
sql create table tb5 using st2 tags (1);
sql create table tb6 using st2 tags (2);
sql create table tb7 using st2 tags (3);
sql create table tb8 using st2 tags (4);
sql create table tb9 using st2 tags (5);
sql create table tba using st2 tags (5);
sql create table tbb using st2 tags (5);
sql create table tbc using st2 tags (5);
sql create table tbd using st2 tags (5);
sql create table tbe using st2 tags (5);
sql create table tbf using st2 tags (5);
sql insert into tb9 values ("2021-05-09 10:12:26.000",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.001",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.002",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.003",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.004",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.005",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.006",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.007",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.008",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.009",28, 29, '30', -1005)
sql delete from tb9 where ts = "2021-05-09 10:12:26.000"
sql flush database $db
sql insert into tb1 values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
sql insert into tb1 values ("2021-05-10 10:10:11", 4, 5.0, NULL, -2000)
sql insert into tb1 values ("2021-05-12 10:10:12", 6,NULL, NULL, -3000)
sql insert into tb2 values ("2021-05-09 10:11:13",-1,-2.0,'-3', -1001)
sql insert into tb2 values ("2021-05-10 10:11:14",-4,-5.0, NULL, -2001)
sql insert into tb2 values ("2021-05-11 10:11:15",-6, -7, '-8', -3001)
sql insert into tb3 values ("2021-05-09 10:12:17", 7, 8.0, '9' , -1002)
sql insert into tb3 values ("2021-05-09 10:12:17",10,11.0, NULL, -2002)
sql insert into tb3 values ("2021-05-09 10:12:18",12,NULL, NULL, -3002)
sql insert into tb4 values ("2021-05-09 10:12:19",13,14.0,'15' , -1003)
sql insert into tb4 values ("2021-05-10 10:12:20",16,17.0, NULL, -2003)
sql insert into tb4 values ("2021-05-11 10:12:21",18,NULL, NULL, -3003)
sql insert into tb5 values ("2021-05-09 10:12:22",19, 20, '21', -1004)
sql insert into tb6 values ("2021-05-11 10:12:23",22, 23, NULL, -2004)
sql insert into tb7 values ("2021-05-10 10:12:24",24,NULL, '25', -3004)
sql insert into tb8 values ("2021-05-11 10:12:25",26,NULL, '27', -4004)
sql insert into tba values ("2021-05-10 10:12:27",31, 32, NULL, -2005)
sql insert into tbb values ("2021-05-10 10:12:28",33,NULL, '35', -3005)
sql insert into tbc values ("2021-05-11 10:12:29",36, 37, NULL, -4005)
sql insert into tbd values ("2021-05-11 10:12:29",NULL,NULL,NULL,NULL )
sql drop table tbf;
sql alter table st2 add column c1 int;
sql alter table st2 drop column c1;
run tsim/parser/last_cache_query.sim
sql flush database $db
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
run tsim/parser/last_cache_query.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sql drop database if exists $db
sql create database $db minrows 10 stt_trigger 1
sql use $db
sql create stable st2 (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) tags (id int)
sql create table tb1 using st2 tags (1);
sql create table tb2 using st2 tags (2);
sql create table tb3 using st2 tags (3);
sql create table tb4 using st2 tags (4);
sql create table tb5 using st2 tags (1);
sql create table tb6 using st2 tags (2);
sql create table tb7 using st2 tags (3);
sql create table tb8 using st2 tags (4);
sql create table tb9 using st2 tags (5);
sql create table tba using st2 tags (5);
sql create table tbb using st2 tags (5);
sql create table tbc using st2 tags (5);
sql create table tbd using st2 tags (5);
sql create table tbe using st2 tags (5);
sql create table tbf using st2 tags (5);
sql insert into tb9 values ("2021-05-09 10:12:26.000",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.001",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.002",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.003",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.004",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.005",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.006",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.007",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.008",28, 29, '30', -1005)
sql insert into tb9 values ("2021-05-09 10:12:26.009",28, 29, '30', -1005)
sql delete from tb9 where ts = "2021-05-09 10:12:26.000"
sql flush database $db
sql insert into tb1 values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
sql insert into tb1 values ("2021-05-10 10:10:11", 4, 5.0, NULL, -2000)
sql insert into tb1 values ("2021-05-12 10:10:12", 6,NULL, NULL, -3000)
sql insert into tb2 values ("2021-05-09 10:11:13",-1,-2.0,'-3', -1001)
sql insert into tb2 values ("2021-05-10 10:11:14",-4,-5.0, NULL, -2001)
sql insert into tb2 values ("2021-05-11 10:11:15",-6, -7, '-8', -3001)
sql insert into tb3 values ("2021-05-09 10:12:17", 7, 8.0, '9' , -1002)
sql insert into tb3 values ("2021-05-09 10:12:17",10,11.0, NULL, -2002)
sql insert into tb3 values ("2021-05-09 10:12:18",12,NULL, NULL, -3002)
sql insert into tb4 values ("2021-05-09 10:12:19",13,14.0,'15' , -1003)
sql insert into tb4 values ("2021-05-10 10:12:20",16,17.0, NULL, -2003)
sql insert into tb4 values ("2021-05-11 10:12:21",18,NULL, NULL, -3003)
sql insert into tb5 values ("2021-05-09 10:12:22",19, 20, '21', -1004)
sql insert into tb6 values ("2021-05-11 10:12:23",22, 23, NULL, -2004)
sql insert into tb7 values ("2021-05-10 10:12:24",24,NULL, '25', -3004)
sql insert into tb8 values ("2021-05-11 10:12:25",26,NULL, '27', -4004)
sql insert into tba values ("2021-05-10 10:12:27",31, 32, NULL, -2005)
sql insert into tbb values ("2021-05-10 10:12:28",33,NULL, '35', -3005)
sql insert into tbc values ("2021-05-11 10:12:29",36, 37, NULL, -4005)
sql insert into tbd values ("2021-05-11 10:12:29",NULL,NULL,NULL,NULL )
sql drop table tbf
sql alter database $db cachemodel 'both'
sql alter database $db cachesize 2
sleep 11000
run tsim/parser/last_cache_query.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -357,6 +357,112 @@ if $data45 != 5 then
return -1 return -1
endi endi
sql select last_row(*), id from st2 group by id order by id
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print ===> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print ===> $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
print ===> $data30 $data31 $data32 $data33 $data34 $data35 $data36 $data37 $data38 $data39
print ===> $data40 $data41 $data42 $data43 $data44 $data45 $data46 $data47 $data48 $data49
if $rows != 5 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != NULL then
print $data02
return -1
endi
if $data03 != NULL then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data10 != @21-05-11 10:12:23.000@ then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data12 != 23.000000000 then
print $data02
return -1
endi
if $data13 != NULL then
return -1
endi
if $data14 != @70-01-01 07:59:58.-04@ then
return -1
endi
if $data15 != 2 then
return -1
endi
if $data20 != @21-05-10 10:12:24.000@ then
return -1
endi
if $data21 != 24 then
return -1
endi
if $data22 != NULL then
print expect NULL actual: $data22
return -1
endi
if $data23 != 25 then
return -1
endi
if $data24 != @70-01-01 07:59:57.-04@ then =
return -1
endi
if $data25 != 3 then
return -1
endi
if $data30 != @21-05-11 10:12:25.000@ then
return -1
endi
if $data31 != 26 then
return -1
endi
if $data32 != NULL then
print $data02
return -1
endi
if $data33 != 27 then
return -1
endi
if $data34 != @70-01-01 07:59:56.-04@ then
return -1
endi
if $data35 != 4 then
return -1
endi
if $data40 != @21-05-11 10:12:29.000@ then
return -1
endi
if $data41 != 36 then
return -1
endi
if $data42 != 37.000000000 then
print $data02
return -1
endi
if $data43 != NULL then
return -1
endi
if $data44 != @70-01-01 07:59:56.-05@ then
return -1
endi
if $data45 != 5 then
return -1
endi
print "test tbn" print "test tbn"
sql create table if not exists tbn (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) sql create table if not exists tbn (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp)
sql insert into tbn values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000) sql insert into tbn values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
@ -386,3 +492,5 @@ if $data04 != @70-01-01 07:59:57.000@ then
return -1 return -1
endi endi
sql alter table tbn add column c1 int;
sql alter table tbn drop column c1;

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 1; sql create database test vgroups 1;
sql use test; sql use test;
sql alter local 'disableCount' '0' ;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0); sql insert into t1 values(1648791213000,0,1,1,1.0);

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 4; sql create database test vgroups 4;
sql use test; sql use test;
sql alter local 'disableCount' '0' ;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1); sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2); sql create table t2 using st tags(2,2,2);

View File

@ -9,8 +9,6 @@ print =============== create database
sql create database test vgroups 1; sql create database test vgroups 1;
sql use test; sql use test;
sql alter local 'disableCount' '0' ;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0); sql insert into t1 values(1648791213000,0,1,1,1.0);

View File

@ -75,7 +75,7 @@ bool shellIsEmptyCommand(const char *cmd) {
int32_t shellRunSingleCommand(char *command) { int32_t shellRunSingleCommand(char *command) {
shellCmdkilled = false; shellCmdkilled = false;
if (shellIsEmptyCommand(command)) { if (shellIsEmptyCommand(command)) {
return 0; return 0;
} }
@ -1019,7 +1019,7 @@ void shellReadHistory() {
char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1); char *line = taosMemoryMalloc(TSDB_MAX_ALLOWED_SQL_LEN + 1);
int32_t read_size = 0; int32_t read_size = 0;
while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) != -1) { while ((read_size = taosGetsFile(pFile, TSDB_MAX_ALLOWED_SQL_LEN, line)) > 0) {
line[read_size - 1] = '\0'; line[read_size - 1] = '\0';
taosMemoryFree(pHistory->hist[pHistory->hend]); taosMemoryFree(pHistory->hist[pHistory->hend]);
pHistory->hist[pHistory->hend] = taosStrdup(line); pHistory->hist[pHistory->hend] = taosStrdup(line);
@ -1315,7 +1315,7 @@ int32_t shellExecute() {
shellSetConn(shell.conn, runOnce); shellSetConn(shell.conn, runOnce);
shellReadHistory(); shellReadHistory();
if(shell.args.is_bi_mode) { if(shell.args.is_bi_mode) {
// need set bi mode // need set bi mode
printf("Set BI mode is true.\n"); printf("Set BI mode is true.\n");
#ifndef WEBSOCKET #ifndef WEBSOCKET