Merge remote-tracking branch 'origin/3.0' into fix/valgrind

This commit is contained in:
Shengliang Guan 2022-08-09 19:11:04 +08:00
commit 0580f54090
19 changed files with 378 additions and 72 deletions

View File

@ -1,8 +1,8 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 29424d5
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG 97c4bac
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -9,7 +9,7 @@ description: TDengine Java 连接器基于标准 JDBC API 实现, 并提供原
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例2.4.0.0 及更高版本)。REST 连接实现的功能集合和原生连接有少量不同。
`taos-jdbcdriver` 是 TDengine 的官方 Java 语言连接器Java 开发人员可以通过它开发存取 TDengine 数据库的应用软件。`taos-jdbcdriver` 实现了 JDBC driver 标准的接口,并提供两种形式的连接器。一种是通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能,一种是通过 taosAdapter 提供的 REST 接口连接 TDengine 实例。REST 连接实现的功能集合和原生连接有少量不同。
![TDengine Database Connector Java](tdengine-jdbc-connector.webp)
@ -41,19 +41,19 @@ REST 连接支持所有能运行 Java 的平台。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对应类型转换如下:
| TDengine DataType | JDBCType driver 版本 < 2.0.24 | JDBCType driver 版本 >= 2.0.24 |
| ----------------- | --------------------------------- | ---------------------------------- |
| TIMESTAMP | java.lang.Long | java.sql.Timestamp |
| INT | java.lang.Integer | java.lang.Integer |
| BIGINT | java.lang.Long | java.lang.Long |
| FLOAT | java.lang.Float | java.lang.Float |
| DOUBLE | java.lang.Double | java.lang.Double |
| SMALLINT | java.lang.Short | java.lang.Short |
| TINYINT | java.lang.Byte | java.lang.Byte |
| BOOL | java.lang.Boolean | java.lang.Boolean |
| BINARY | java.lang.String | byte array |
| NCHAR | java.lang.String | java.lang.String |
| JSON | - | java.lang.String |
| TDengine DataType | JDBCType |
| ----------------- | ---------------------------------- |
| TIMESTAMP | java.sql.Timestamp |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| SMALLINT | java.lang.Short |
| TINYINT | java.lang.Byte |
| BOOL | java.lang.Boolean |
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
**注意**JSON 类型仅在 tag 中支持。
@ -198,7 +198,7 @@ url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 和 TDengine 2.4.0.12 版本开始JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 5000。
@ -216,7 +216,7 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
```
- 从 taos-jdbcdriver-2.0.36 和 TDengine 2.2.0.0 版本开始,如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/test那么可以执行 sqlinsert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
:::
@ -362,7 +362,7 @@ JDBC 连接器可能报错的错误码包括 3 种JDBC driver 本身的报错
### 通过参数绑定写入数据
从 2.1.2.0 版本开始,TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入INSERT场景的支持。采用这种方式写入数据时能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入INSERT场景的支持。采用这种方式写入数据时能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
**注意**
@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList<String> list, int size) throws
### 无模式写入
从 2.2.0.0 版本开始TDengine 增加了对无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](/reference/schemaless/)。
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。
**注意**
@ -859,20 +859,6 @@ public static void main(String[] args) throws Exception {
> 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。
**注意事项:**
- TDengine `v1.6.4.1` 版本开始提供了一个专门用于心跳检测的函数 `select server_status()`,所以在使用连接池时推荐使用 `select server_status()` 进行 Validation Query。
如下所示,`select server_status()` 执行成功会返回 `1`。
```sql
taos> select server_status();
server_status()|
================
1 |
Query OK, 1 row(s) in set (0.000141s)
```
### 更多示例程序
示例程序源码位于 `TDengine/examples/JDBC` 下:
@ -914,7 +900,7 @@ Query OK, 1 row(s) in set (0.000141s)
**解决方法**:重新安装 64 位 JDK。
4. 其它问题请参考 [FAQ](/train-faq/faq)
4. 其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考

View File

@ -47,6 +47,8 @@ bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, u
void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
#ifdef __cplusplus
}

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_BLOOMFILTER_H_
#include "os.h"
#include "tencode.h"
#include "thash.h"
#ifdef __cplusplus
@ -42,6 +43,8 @@ int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter *pBF);
bool tBloomFilterIsFull(const SBloomFilter *pBF);
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder);
SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder);
#ifdef __cplusplus
}

View File

@ -33,7 +33,8 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
uint32_t len);
void tScalableBfDestroy(SScalableBf *pSBf);
void tScalableBfDump(const SScalableBf *pSBf);
int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder);
SScalableBf* tScalableBfDecode(SDecoder* pDecoder);
#ifdef __cplusplus
}

View File

@ -355,7 +355,11 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
tsNumOfSupportVnodes = tsNumOfCores * 2;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;

View File

@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.lockEnable = true};
SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);

View File

@ -1537,6 +1537,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
if (pContext->pStmtCb && *pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
&context.pTableBlockHashObj);
if (NULL == context.pVgroupsHashObj) {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
if (NULL == context.pTableBlockHashObj) {
context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj =

View File

@ -61,6 +61,8 @@ typedef enum {
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_ASYNC_LAUNCH_TASK 0
typedef struct SSchDebug {
bool lockEnable;
bool apiEnable;
@ -281,6 +283,11 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
typedef struct SSchTaskCtx {
SSchJob *pJob;
SSchTask *pTask;
} SSchTaskCtx;
extern SSchedulerMgmt schMgmt;
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
@ -428,7 +435,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchFetchTask(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);

View File

@ -54,7 +54,7 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
sum += pTask->plan->execNodeStat.tableNum;
}
if (sum < schMgmt.cfg.maxNodeTableNum) {
if (schMgmt.cfg.maxNodeTableNum <= 0 || sum < schMgmt.cfg.maxNodeTableNum) {
SCH_JOB_DLOG("job no need flow ctrl, totalTableNum:%d", sum);
return TSDB_CODE_SUCCESS;
}
@ -230,7 +230,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) {

View File

@ -819,7 +819,10 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = pCtx->pJob;
SSchTask *pTask = pCtx->pTask;
int8_t status = 0;
int32_t code = 0;
@ -834,12 +837,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
@ -850,19 +853,51 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_RET(code);
SCH_ERR_JRET(code);
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
}
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
_return:
taosMemoryFree(param);
#if SCH_ASYNC_LAUNCH_TASK
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
if (code) {
code = schHandleJobFailure(pJob, code);
}
#endif
SCH_RET(code);
}
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx));
if (NULL == param) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
param->pJob = pJob;
param->pTask = pTask;
#if SCH_ASYNC_LAUNCH_TASK
taosAsyncExec(schLaunchTaskImpl, param, NULL);
#else
SCH_ERR_RET(schLaunchTaskImpl(param));
#endif
return TSDB_CODE_SUCCESS;
}
@ -878,10 +913,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
if (enough) {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
}
} else {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
}
return TSDB_CODE_SUCCESS;

View File

@ -14,6 +14,7 @@
*/
#include "tstreamUpdate.h"
#include "tencode.h"
#include "ttime.h"
#include "query.h"
@ -250,3 +251,110 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
tScalableBfDestroy(pInfo->pCloseWinSBF);
pInfo->pCloseWinSBF = NULL;
}
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
ASSERT(pInfo);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
}
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1;
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
}
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1;
}
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
ASSERT(pInfo);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosArrayPush(pInfo->pTsBuckets, &ts);
}
if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1;
int32_t sBfSize = 0;
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = tScalableBfDecode(&decoder);
if (!pSBf) return -1;
taosArrayPush(pInfo->pTsSBFs, &pSBf);
}
if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
pInfo->pCloseWinSBF = tScalableBfDecode(&decoder);
int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
uint64_t uid = 0;
ts = INT64_MIN;
for(int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
}
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}

View File

@ -6,11 +6,37 @@
using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
bool equalSBF(SScalableBf* left, SScalableBf* right) {
if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false;
int lsize = taosArrayGetSize(left->bfArray);
int rsize = taosArrayGetSize(right->bfArray);
if (lsize != rsize) return false;
for (int32_t i = 0; i < lsize; i++) {
SBloomFilter* pLeftBF = (SBloomFilter*)taosArrayGetP(left->bfArray, i);
SBloomFilter* pRightBF = (SBloomFilter*)taosArrayGetP(right->bfArray, i);
if (pLeftBF->errorRate != pRightBF->errorRate) return false;
if (pLeftBF->expectedEntries != pRightBF->expectedEntries) return false;
if (pLeftBF->hashFn1 != pRightBF->hashFn1) return false;
if (pLeftBF->hashFn2 != pRightBF->hashFn2) return false;
if (pLeftBF->hashFunctions != pRightBF->hashFunctions) return false;
if (pLeftBF->numBits != pRightBF->numBits) return false;
if (pLeftBF->numUnits != pRightBF->numUnits) return false;
if (pLeftBF->size != pRightBF->size) return false;
uint64_t* leftUint = (uint64_t*) pLeftBF->buffer;
uint64_t* rightUint = (uint64_t*) pRightBF->buffer;
for (int32_t j = 0; j < pLeftBF->numUnits; j++) {
if (leftUint[j] != rightUint[j]) return false;
}
}
return true;
}
TEST(TD_STREAM_UPDATE_TEST, update) {
int64_t interval = 20 * 1000;
int64_t watermark = 10 * 60 * 1000;
const int64_t interval = 20 * 1000;
const int64_t watermark = 10 * 60 * 1000;
SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, -1), true);
for(int i=0; i < 1024; i++) {
@ -31,15 +57,16 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true);
}
TSKEY uid = 0;
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), false);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023);
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), true);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023);
SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
for(int i=1; i <= watermark / interval; i++) {
@ -75,7 +102,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval);
GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5);
TSKEY uid2 = 1;
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU2->pMap, &uid2, sizeof(uint64_t)), i * interval + 5);
}
SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
@ -84,7 +112,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU3, i, i * interval + 5 * j), false);
GTEST_ASSERT_EQ(pSU3->minTS, 0);
GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);
uint64_t uid3 = i;
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU3->pMap, &uid3, sizeof(uint64_t)), i * interval + 5 * j);
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i);
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
GTEST_ASSERT_EQ(pBF->size, j);
@ -92,13 +121,66 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
}
SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1);
GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU4->watermark, pSU4->interval);
GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU5->watermark, pSU4->interval);
GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU7 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
updateInfoAddCloseWindowSBF(pSU7);
for(int64_t i = 1; i < 2048000; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,i, i), false);
}
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,100, 1), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,110, 10), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,200, 20), true);
int32_t bufLen = updateInfoSerialize(NULL, 0, pSU7);
void* buf = taosMemoryCalloc(1, bufLen);
int32_t resSize = updateInfoSerialize(buf, bufLen, pSU7);
SUpdateInfo *pSU6 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
int32_t desSize = updateInfoDeserialize(buf, bufLen, pSU6);
GTEST_ASSERT_EQ(desSize, 0);
GTEST_ASSERT_EQ(pSU7->interval, pSU6->interval);
GTEST_ASSERT_EQ(pSU7->maxVersion, pSU6->maxVersion);
GTEST_ASSERT_EQ(pSU7->minTS, pSU6->minTS);
GTEST_ASSERT_EQ(pSU7->numBuckets, pSU6->numBuckets);
GTEST_ASSERT_EQ(pSU7->numSBFs, pSU6->numSBFs);
GTEST_ASSERT_EQ(pSU7->scanGroupId, pSU6->scanGroupId);
GTEST_ASSERT_EQ(pSU7->scanWindow.ekey, pSU6->scanWindow.ekey);
GTEST_ASSERT_EQ(pSU7->scanWindow.skey, pSU6->scanWindow.skey);
GTEST_ASSERT_EQ(pSU7->watermark, pSU6->watermark);
GTEST_ASSERT_EQ(equalSBF(pSU7->pCloseWinSBF, pSU6->pCloseWinSBF), true);
int32_t mapSize = taosHashGetSize(pSU7->pMap);
GTEST_ASSERT_EQ(mapSize, taosHashGetSize(pSU6->pMap));
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pSU7->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
void* value6 = taosHashGet(pSU6->pMap, key, keyLen);
GTEST_ASSERT_EQ(*(TSKEY*)pIte, *(TSKEY*)value6);
}
int32_t buSize = taosArrayGetSize(pSU7->pTsBuckets);
GTEST_ASSERT_EQ(buSize, taosArrayGetSize(pSU6->pTsBuckets));
for (int32_t i = 0; i < buSize; i++) {
TSKEY ts1 = *(TSKEY*)taosArrayGet(pSU7->pTsBuckets, i);
TSKEY ts2 = *(TSKEY*)taosArrayGet(pSU6->pTsBuckets, i);
GTEST_ASSERT_EQ(ts1, ts2);
}
int32_t lSize = taosArrayGetSize(pSU7->pTsSBFs);
int32_t rSize = taosArrayGetSize(pSU6->pTsSBFs);
GTEST_ASSERT_EQ(lSize, rSize);
for (int32_t i = 0; i < lSize; i++) {
SScalableBf* pLeftSBF = (SScalableBf*)taosArrayGetP(pSU7->pTsSBFs, i);
SScalableBf* pRightSBF = (SScalableBf*)taosArrayGetP(pSU6->pTsSBFs, i);
GTEST_ASSERT_EQ(equalSBF(pLeftSBF, pRightSBF), true);
}
updateInfoDestroy(pSU);
updateInfoDestroy(pSU1);
@ -106,6 +188,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
updateInfoDestroy(pSU3);
updateInfoDestroy(pSU4);
updateInfoDestroy(pSU5);
updateInfoDestroy(pSU6);
updateInfoDestroy(pSU7);
}
int main(int argc, char* argv[]) {

View File

@ -1271,6 +1271,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
} else {
// change error code for taos client driver if retryCnt exceeds limit
if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY;
}
}
}

View File

@ -108,8 +108,41 @@ void tBloomFilterDestroy(SBloomFilter *pBF) {
taosMemoryFree(pBF);
}
void tBloomFilterDump(const struct SBloomFilter *pBF) {
// ToDo
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder) {
if (tEncodeU32(pEncoder, pBF->hashFunctions) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->expectedEntries) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->numUnits) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->numBits) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->size) < 0) return -1;
for (uint64_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tEncodeU64(pEncoder, pUnits[i]) < 0) return -1;
}
if (tEncodeDouble(pEncoder, pBF->errorRate) < 0) return -1;
return 0;
}
SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder) {
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
pBF->buffer = NULL;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error;
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
for (int32_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
return pBF;
_error:
tBloomFilterDestroy(pBF);
return NULL;
}
bool tBloomFilterIsFull(const SBloomFilter *pBF) {

View File

@ -47,7 +47,7 @@ STaosError errors[] = {
// rpc
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Database not ready, need retry")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")

View File

@ -101,6 +101,42 @@ void tScalableBfDestroy(SScalableBf *pSBf) {
taosMemoryFree(pSBf);
}
void tScalableBfDump(const SScalableBf *pSBf) {
// Todo;
}
int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder) {
if (!pSBf) {
if (tEncodeI32(pEncoder, 0) < 0) return -1;
return 0;
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
SBloomFilter* pBF = taosArrayGetP(pSBf->bfArray, i);
if (tBloomFilterEncode(pBF, pEncoder) < 0) return -1;
}
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
return 0;
}
SScalableBf* tScalableBfDecode(SDecoder* pDecoder) {
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
pSBf->bfArray = NULL;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) goto _error;
if (size == 0) {
tScalableBfDestroy(pSBf);
return NULL;
}
pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *));
for (int32_t i = 0; i < size; i++) {
SBloomFilter* pBF = tBloomFilterDecode(pDecoder);
if (!pBF) goto _error;
taosArrayPush(pSBf->bfArray, &pBF);
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
return pSBf;
_error:
tScalableBfDestroy(pSBf);
return NULL;
}

View File

@ -24,12 +24,9 @@ class TDSimClient:
self.cfgDict = {
"numOfLogLines": "100000000",
"numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
"anyIp": "0",
"sdbDebugFlag": "135",
"rpcDebugFlag": "135",
"tmrDebugFlag": "131",
"cDebugFlag": "135",

View File

@ -30,7 +30,6 @@ class TDSimClient:
self.path = path
self.cfgDict = {
"numOfLogLines": "100000000",
"numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
@ -40,6 +39,7 @@ class TDSimClient:
"udebugFlag": "143",
"jnidebugFlag": "143",
"qdebugFlag": "143",
"supportVnodes": "1024",
"telemetryReporting": "0",
}
@ -117,8 +117,6 @@ class TDDnode:
self.valgrind = 0
self.remoteIP = ""
self.cfgDict = {
"walLevel": "2",
"fsync": "1000",
"monitor": "0",
"maxShellConns": "30000",
"locale": "en_US.UTF-8",
@ -139,6 +137,7 @@ class TDDnode:
"qdebugFlag": "143",
"numOfLogLines": "100000000",
"statusInterval": "1",
"supportVnodes": "1024",
"telemetryReporting": "0"
}