Merge branch '3.0' into fix/TD-29844

This commit is contained in:
Yihao Deng 2024-04-29 01:27:20 +00:00
commit 2beb46c9df
47 changed files with 510 additions and 232 deletions

View File

@ -25,7 +25,7 @@ create_definition:
col_name column_definition
column_definition:
type_name [comment 'string_value'] [PRIMARY KEY]
type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type']
table_options:
table_option ...
@ -51,6 +51,7 @@ table_option: {
Only ASCII visible characters can be used with escape character.
**Parameter description**
1. COMMENT: specifies comments for the table. This parameter can be used with supertables, standard tables, and subtables.
2. SMA: specifies functions on which to enable small materialized aggregates (SMA). SMA is user-defined precomputation of aggregates based on data blocks. Enter one of the following values: max, min, or sum This parameter can be used with supertables and standard tables.
3. TTL: specifies the time to live (TTL) for the table. If TTL is specified when creatinga table, after the time period for which the table has been existing is over TTL, TDengine will automatically delete the table. Please be noted that the system may not delete the table at the exact moment that the TTL expires but guarantee there is such a system and finally the table will be deleted. The unit of TTL is in days. The default value is 0, i.e. never expire.
@ -104,6 +105,7 @@ alter_table_option: {
**More explanations**
You can perform the following modifications on existing tables:
1. ADD COLUMN: adds a column to the supertable.
2. DROP COLUMN: deletes a column from the supertable.
3. MODIFY COLUMN: changes the length of the data type specified for the column. Note that you can only specify a length greater than the current length.
@ -154,6 +156,7 @@ alter_table_option: {
```
**More explanations**
1. Only the value of a tag can be modified directly. For all other modifications, you must modify the supertable from which the subtable was created.
### Change Tag Value Of Sub Table

View File

@ -28,9 +28,9 @@ In the function list, you can only specify supported aggregate functions (see be
Since the output of TSMA is a super table, the row length of the output table is subject to the maximum row length limit. The size of the `intermediate results of different functions` varies, but they are generally larger than the original data size. If the row length of the output table exceeds the maximum row length limit, an error `Row length exceeds max length` will be reported. In this case, you need to reduce the number of functions or split commonly used functions groups into multiple TSMA objects.
The window size is limited to [1ms ~ 1h]. The unit of INTERVAL is the same as the INTERVAL clause in the query, such as a (milliseconds), b (nanoseconds), h (hours), m (minutes), s (seconds), u (microseconds).
The window size is limited to [1m ~ 1h]. The unit of INTERVAL is the same as the INTERVAL clause in the query, such as a (milliseconds), b (nanoseconds), h (hours), m (minutes), s (seconds), u (microseconds).
TSMA is a database-level object, but it is globally unique. The number of TSMA that can be created in the cluster is limited by the parameter `maxTsmaNum`, with a default value of 8 and a range of [0-12]. Note that since TSMA background calculation uses stream computing, creating a TSMA will create a stream. Therefore, the number of TSMA that can be created is also limited by the number of existing streams and the maximum number of streams that can be created.
TSMA is a database-level object, but it is globally unique. The number of TSMA that can be created in the cluster is limited by the parameter `maxTsmaNum`, with a default value of 3 and a range of [0-3]. Note that since TSMA background calculation uses stream computing, creating a TSMA will create a stream. Therefore, the number of TSMA that can be created is also limited by the number of existing streams and the maximum number of streams that can be created.
## Supported Functions
| function | comments |
@ -44,7 +44,6 @@ TSMA is a database-level object, but it is globally unique. The number of TSMA t
|count| If you want to use count(*), you should create the count(ts) function|
|spread||
|stddev||
|hyperloglog||
|||
## Drop TSMA
@ -65,6 +64,8 @@ Client configuration parameter: `querySmaOptimize`, used to control whether to u
Client configuration parameter: `maxTsmaCalcDelay`, in seconds, is used to control the acceptable TSMA calculation delay for users. If the calculation progress of a TSMA is within this range from the latest time, the TSMA will be used. If it exceeds this range, it will not be used. The default value is 600 (10 minutes), with a minimum value of 600 (10 minutes) and a maximum value of 86400 (1 day).
Client configuration parameter: `tsmaDataDeleteMark`, in milliseconds, consistent with the stream computing parameter `deleteMark`, is used to control the retention time of intermediate results in stream computing. The default value is 1 day, with a minimum value of 1 hour. Therefore, historical data that is older than the configuration parameter will not have the intermediate results saved in stream computing. If you modify the data within these time windows, the TSMA calculation results will not include the updated results. This means that the TSMA results will be inconsistent with querying the original data.
### Using TSMA Duraing Query
The aggregate functions defined in TSMA can be directly used in most query scenarios. If multiple TSMA are available, the one with the larger window size is preferred. For unclosed windows, the calculation can be done using smaller window TSMA or the original data. However, there are certain scenarios where TSMA cannot be used (see below). In such cases, the entire query will be calculated using the original data.

View File

@ -250,6 +250,15 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
| Value Range | 600s - 86400s, 10 minutes to 1 hour |
| Default value | 600s |
### tsmaDataDeleteMark
| Attribute | Description |
| -------- | --------------------------- |
| Applicable | Client only |
| Meaning | The duration for which the intermediate results of TSMA calculations are saved, in milliseconds |
| Value Range | >= 3600000, greater than or equal to 1 hour |
| Default value | 86400000, 1d |
## Locale Parameters
@ -776,8 +785,8 @@ The charset that takes effect is UTF-8.
| --------- | ----------------------------- |
| Applicable | Server Only |
| Meaning | Max num of TSMAs |
| Value Range | 0-12 |
| Default Value | 8 |
| Value Range | 0-3 |
| Default Value | 3 |
## 3.0 Parameters

View File

@ -23,7 +23,10 @@ create_subtable_clause: {
}
create_definition:
col_name column_type [PRIMARY KEY]
col_name column_definition
column_definition:
type_name [comment 'string_value'] [PRIMARY KEY] [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL 'level_type']
table_options:
table_option ...

View File

@ -28,9 +28,9 @@ TSMA只能基于超级表和普通表创建, 不能基于子表创建.
由于TSMA输出为一张超级表, 因此输出表的行长度受最大行长度限制, 不同函数的`中间结果`大小各异, 一般都大于原始数据大小, 若输出表的行长度大于最大行长度限制, 将会报`Row length exceeds max length`错误. 此时需要减少函数个数或者将常用的函数进行分组拆分到多个TSMA中.
窗口大小的限制为[1ms ~ 1h]. INTERVAL 的单位与查询中INTERVAL字句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微妙).
窗口大小的限制为[1m ~ 1h]. INTERVAL 的单位与查询中INTERVAL子句相同, 如 a (毫秒), b (纳秒), h (小时), m (分钟), s (秒), u (微妙).
TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数受参数`maxTsmaNum`限制, 参数默认值为8, 范围: [0-12]. 注意, 由于TSMA后台计算使用流计算, 因此每创建一条TSMA, 将会创建一条流, 因此能够创建的TSMA条数也受当前已经存在的流条数和最大可创建流条数限制.
TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数受参数`maxTsmaNum`限制, 参数默认值为3, 范围: [0-3]. 注意, 由于TSMA后台计算使用流计算, 因此每创建一条TSMA, 将会创建一条流, 因此能够创建的TSMA条数也受当前已经存在的流条数和最大可创建流条数限制.
## 支持的函数列表
| 函数| 备注 |
@ -44,7 +44,6 @@ TSMA为库内对象, 但名字全局唯一. 集群内一共可创建TSMA个数
|count| 若想使用count(*), 则应创建count(ts)函数|
|spread||
|stddev||
|hyperloglog||
|||
## 删除TSMA
@ -64,6 +63,8 @@ TSMA的计算结果为与原始表相同库下的一张超级表, 此表用户
客户端配置参数:`maxTsmaCalcDelay`,单位 s用于控制用户可以接受的 TSMA 计算延迟,若 TSMA 的计算进度与最新时间差距在此范围内, 则该 TSMA 将会被使用, 若超出该范围, 则不使用, 默认值: 60010 分钟), 最小值: 60010 分钟), 最大值: 864001 天).
客户端配置参数: `tsmaDataDeleteMark`, 单位毫秒, 与流计算参数`deleteMark`一致, 用于控制流计算中间结果的保存时间, 默认值为: 1d, 最小值为1h. 因此那些距最后一条数据时间大于配置参数的历史数据将不保存流计算中间结果, 因此若修改这些时间窗口内的数据, TSMA的计算结果中将不包含更新的结果. 即与查询原始数据结果将不一致.
### 查询时使用TSMA
已在 TSMA 中定义的 agg 函数在大部分查询场景下都可直接使用, 若存在多个可用的 TSMA 优先使用大窗口的 TSMA 未闭合窗口通过查询小窗口TSMA或者原始数据计算。 同时也有某些场景不能使用 TSMA(见下文)。 不可用时整个查询将使用原始数据进行计算。

View File

@ -249,6 +249,15 @@ taos -C
| 取值范围 | 600s - 86400s, 即10分钟-1小时 |
| 缺省值 | 600s |
### tsmaDataDeleteMark
| 属性 | 说明 |
| -------- | --------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | TSMA计算的历史数据中间结果保存时间, 单位为毫秒 |
| 取值范围 | >= 3600000, 即大于等于1h |
| 缺省值 | 86400000, 即1d |
## 区域相关
@ -761,8 +770,8 @@ charset 的有效值是 UTF-8。
| -------- | --------------------------- |
| 适用范围 | 仅服务端适用 |
| 含义 | 集群内可创建的TSMA个数 |
| 取值范围 | 0-12 |
| 缺省值 | 8 |
| 取值范围 | 0-3 |
| 缺省值 | 3 |
## 压缩参数

View File

@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
int32_t s3GetObjectToFile(const char *object_name, const char *fileName);
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)

View File

@ -223,6 +223,7 @@ extern int32_t tmqMaxTopicNum;
extern int32_t tmqRowSize;
extern int32_t tsMaxTsmaNum;
extern int32_t tsMaxTsmaCalcDelay;
extern int64_t tsmaDataDeleteMark;
// wal
extern int64_t tsWalFsyncDataSizeLimit;

View File

@ -60,6 +60,7 @@ typedef enum {
int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key);
int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
int64_t grantRemain(EGrantType grant);
int32_t grantCheck(EGrantType grant);
int32_t grantCheckExpire(EGrantType grant);
char *tGetMachineId();

View File

@ -56,6 +56,7 @@ extern "C" {
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@ -443,6 +444,7 @@ typedef struct SDownstreamStatusInfo {
typedef struct STaskCheckInfo {
SArray* pList;
int64_t startTs;
int64_t timeoutStartTs;
int32_t notReadyTasks;
int32_t inCheckProcess;
int32_t stopCheckProcess;
@ -547,7 +549,7 @@ typedef struct SStreamMeta {
SArray* chkpSaved;
SArray* chkpInUse;
SRWLatch chkpDirLock;
void* qHandle;
void* qHandle; // todo remove it
void* bkdChkptMgt;
} SStreamMeta;
@ -885,6 +887,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);

View File

@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef bool (*RpcNoDelayfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit {
@ -118,6 +119,8 @@ typedef struct SRpcInit {
// fail fast fp
RpcFFfp ffp;
RpcNoDelayfp noDelayFp;
int32_t connLimitNum;
int32_t connLimitLock;
int32_t timeToGetConn;

View File

@ -77,6 +77,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)

View File

@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
}
int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0;
@ -1733,6 +1733,6 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
int32_t s3GetObjectToFile(const char *object_name, char *fileName) { return 0; }
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { return 0; }
#endif

View File

@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
@ -193,6 +194,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -60,7 +60,7 @@ int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfTaskQueueThreads = 10;
int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
@ -308,8 +308,9 @@ int32_t tsS3UploadDelaySec = 60;
bool tsExperimental = true;
int32_t tsMaxTsmaNum = 8;
int32_t tsMaxTsmaNum = 3;
int32_t tsMaxTsmaCalcDelay = 600;
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
@ -552,12 +553,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
if (cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
tsNumOfTaskQueueThreads = tsNumOfCores;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 10);
if (tsNumOfTaskQueueThreads >= 50) {
tsNumOfTaskQueueThreads = 50;
}
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
@ -571,6 +569,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxTsmaCalcDelay", tsMaxTsmaCalcDelay, 600, 86400, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) !=
0)
return -1;
if (cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT,
CFG_DYN_CLIENT) != 0)
return -1;
return 0;
}
@ -751,7 +752,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 12, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 3, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
@ -1144,6 +1145,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval;
tsMaxTsmaCalcDelay = cfgGetItem(pCfg, "maxTsmaCalcDelay")->i32;
tsmaDataDeleteMark = cfgGetItem(pCfg, "tsmaDataDeleteMark")->i32;
return 0;
}
@ -1810,7 +1812,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"useAdapter", &tsUseAdapter},
{"experimental", &tsExperimental},
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}};
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
int32_t nVgroup = 0;
if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit;
if (nVgroup > 0) {
pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite));
pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup);
if (!pReq->pVloads) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite vload;
if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit;
taosArrayPush(pReq->pVloads, &vload);
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit;
}
}

View File

@ -49,7 +49,7 @@ typedef struct SDnodeMgmt {
// dmHandle.c
SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -169,23 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
SNotifyReq req = {0};
taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.dnodeId = pMgmt->pData->dnodeId;
taosThreadRwlockUnlock(&pMgmt->pData->lock);
req.clusterId = pMgmt->pData->clusterId;
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void * pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
void *pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, pReq);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "tgrant.h"
#include "thttp.h"
static void *dmStatusThreadFp(void *param) {
@ -47,21 +48,97 @@ static void *dmStatusThreadFp(void *param) {
}
SDmNotifyHandle dmNotifyHdl = {.state = 0};
#define TIMESERIES_STASH_NUM 5
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
return NULL;
}
// calculate approximate timeSeries per second
int64_t notifyTimeStamp[TIMESERIES_STASH_NUM];
int64_t notifyTimeSeries[TIMESERIES_STASH_NUM];
int64_t approximateTimeSeries = 0;
uint64_t nTotalNotify = 0;
int32_t head, tail = 0;
bool wait = true;
int32_t nDnode = 0;
int64_t lastNotify = 0;
int64_t lastFetchDnode = 0;
SNotifyReq req = {0};
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (wait) tsem_wait(&dmNotifyHdl.sem);
atomic_store_8(&dmNotifyHdl.state, 1);
dmSendNotifyReq(pMgmt);
int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
goto _skip;
}
int64_t current = taosGetTimestampMs();
if (current - lastFetchDnode > 1000) {
nDnode = dmGetDnodeSize(pMgmt->pData);
if (nDnode < 1) nDnode = 1;
lastFetchDnode = current;
}
if (req.dnodeId == 0 || req.clusterId == 0) {
req.dnodeId = pMgmt->pData->dnodeId;
req.clusterId = pMgmt->pData->clusterId;
}
if (current - lastNotify < 10) {
int64_t nCmprTimeSeries = approximateTimeSeries / 100;
if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
if (remainTimeSeries > nCmprTimeSeries * 10) {
taosMsleep(10);
} else if (remainTimeSeries > nCmprTimeSeries * 5) {
taosMsleep(5);
} else {
taosMsleep(2);
}
}
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t nVgroup = taosArrayGetSize(req.pVloads);
int64_t nTimeSeries = 0;
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
nTimeSeries += vload->nTimeSeries;
}
notifyTimeSeries[tail] = nTimeSeries;
notifyTimeStamp[tail] = taosGetTimestampNs();
++nTotalNotify;
approximateTimeSeries = 0;
if (nTotalNotify >= TIMESERIES_STASH_NUM) {
head = tail - TIMESERIES_STASH_NUM + 1;
if (head < 0) head += TIMESERIES_STASH_NUM;
int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
if (tsDiff > 0) {
if (timeDiff > 0 && timeDiff < 1e9) {
approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
dmSendNotifyReq(pMgmt, &req);
}
} else {
dmSendNotifyReq(pMgmt, &req);
}
}
} else {
dmSendNotifyReq(pMgmt, &req);
}
if (++tail == TIMESERIES_STASH_NUM) tail = 0;
tFreeSNotifyReq(&req);
lastNotify = taosGetTimestampMs();
_skip:
if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
wait = true;
continue;

View File

@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) {
return false;
}
}
static bool rpcNoDelayMsg(tmsg_t msgType) {
if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE ||
msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) {
return true;
}
return false;
}
int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
rpcInit.noDelayFp = rpcNoDelayMsg;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit);

View File

@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir);
int32_t dmInitDndInfo(SDnodeData *pData);
// dmEps.c
int32_t dmGetDnodeSize(SDnodeData *pData);
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);

View File

@ -355,6 +355,14 @@ _OVER:
return code;
}
int32_t dmGetDnodeSize(SDnodeData *pData) {
int32_t size = 0;
taosThreadRwlockRdlock(&pData->lock);
size = taosArrayGetSize(pData->dnodeEps);
taosThreadRwlockUnlock(&pData->lock);
return size;
}
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
taosThreadRwlockWrlock(&pData->lock);
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);

View File

@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
int64_t grantRemain(EGrantType grant) { return 0; }
char *tGetMachineId() { return NULL; };
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }

View File

@ -22,10 +22,10 @@
#include "mndPrivilege.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndStb.h"
#include "mndUser.h"
#include "mndView.h"
#include "mndSma.h"
#include "tglobal.h"
#include "tversion.h"
@ -57,6 +57,13 @@ typedef struct {
int64_t lastAccessTimeMs;
} SAppObj;
typedef struct {
int32_t totalDnodes;
int32_t onlineDnodes;
SEpSet epSet;
SArray *pQnodeList;
} SConnPreparedObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
@ -460,7 +467,7 @@ static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
}
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
SRpcConnInfo connInfo = pMsg->info.conn;
@ -503,11 +510,11 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
rspBasic->connId = pConn->id;
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
rspBasic->connId = pConn->id;
rspBasic->totalDnodes = pObj->totalDnodes;
rspBasic->onlineDnodes = pObj->onlineDnodes;
rspBasic->epSet = pObj->epSet;
rspBasic->pQnodeList = taosArrayDup(pObj->pQnodeList, NULL);
mndReleaseConn(pMnode, pConn, true);
@ -608,7 +615,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
#endif
case HEARTBEAT_KEY_TSMA: {
void * rspMsg = NULL;
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateTSMAInfo(pMnode, kv->value, kv->valueLen / sizeof(STSMAVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
@ -641,6 +648,12 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
return -1;
}
SConnPreparedObj obj = {0};
obj.totalDnodes = mndGetDnodeSize(pMnode);
mndGetOnlineDnodeNum(pMnode, &obj.onlineDnodes);
mndGetMnodeEpSet(pMnode, &obj.epSet);
mndCreateQnodeList(pMnode, &obj.pQnodeList, -1);
SClientHbBatchRsp batchRsp = {0};
batchRsp.svrTimestamp = taosGetTimestampSec();
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
@ -649,7 +662,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
for (int i = 0; i < sz; i++) {
SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp, &obj);
} else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) {
@ -668,6 +681,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
pReq->info.rspLen = tlen;
pReq->info.rsp = buf;
taosArrayDestroy(obj.pQnodeList);
return 0;
}

View File

@ -1455,6 +1455,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->targetStbUid = 0;
pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0;
pCxt->pCreateStreamReq->deleteMark = pCxt->pCreateSmaReq->deleteMark;
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
pCxt->pCreateStreamReq->ast = taosStrdup(pCxt->pCreateSmaReq->ast);
@ -2340,11 +2341,6 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
}
_OVER:
if (code != 0) {
mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(),
tsmaReq.fetchingWithTsmaName);
}
tFreeTableTSMAInfoRsp(&rsp);
return code;
}

View File

@ -4216,7 +4216,10 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
SMnode *pMnode = pRsp->info.node;
SVFetchTtlExpiredTbsRsp rsp = {0};
SMndDropTbsWithTsmaCtx *pCtx = NULL;
if (pRsp->code != TSDB_CODE_SUCCESS) goto _end;
if (pRsp->code != TSDB_CODE_SUCCESS) {
terrno = pRsp->code;
goto _end;
}
if (pRsp->contLen == 0) {
code = 0;
goto _end;

View File

@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
// history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a");
@ -1644,10 +1651,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false);
// checkpoint info
// checkpoint version
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
// checkpoint backup status
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true);
// ds_err_info
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, 0, true);

View File

@ -23,6 +23,10 @@ typedef struct STaskUpdateEntry {
int32_t transId;
} STaskUpdateEntry;
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
} SMStreamCheckpointReadyRspMsg;
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
@ -518,63 +522,15 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
int64_t initTs = 0;
int64_t now = taosGetTimestampMs();
STaskId id = {.streamId = rsp.streamId, .taskId = rsp.upstreamTaskId};
STaskId fId = {0};
bool hasHistoryTask = false;
// todo extract method
if (!isLeader) {
// this task may have been stopped, so acquire task may failed. Retrieve it directly from the task hash map.
streamMetaRLock(pMeta);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
} else {
streamMetaRUnLock(pMeta);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
return code;
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
streamMetaRLock(pMeta);
// let's try to find this task in hashmap
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
streamMetaRUnLock(pMeta);
if (hasHistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
}
} else { // not exist even in the hash map of meta, forget it
streamMetaRUnLock(pMeta);
}
streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return code;
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
}
code = streamProcessCheckRsp(pTask, &rsp);
@ -582,10 +538,6 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
return code;
}
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
} SMStreamCheckpointReadyRspMsg;
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -868,6 +820,9 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);

View File

@ -3169,7 +3169,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "hyperloglog",
.type = FUNCTION_TYPE_HYPERLOGLOG,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC,
.translateFunc = translateHLL,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
@ -3181,7 +3181,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
#endif
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_partial",
.pStateFunc = "_hyperloglog_state",
.pMergeFunc = "_hyperloglog_merge"
},
{
@ -3211,7 +3210,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = NULL,
#endif
.combineFunc = hllCombine,
.pPartialFunc = "_hyperloglog_state_merge",
.pMergeFunc = "_hyperloglog_merge",
},
{
@ -4087,8 +4085,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastPartialFinalize,
},
{
.name = "_hyperloglog_state",
{ .name = "_hyperloglog_state",
.type = FUNCTION_TYPE_HYPERLOGLOG_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateHLLState,

View File

@ -10928,7 +10928,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = TIME_UNIT_MILLISECOND;
#define TSMA_MIN_INTERVAL_MS 1 // 1ms
#define TSMA_MIN_INTERVAL_MS 1000 * 60 // 1m
#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h
if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) {
return TSDB_CODE_TSMA_INVALID_INTERVAL;
@ -10989,6 +10989,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
}
if (TSDB_CODE_SUCCESS == code) {
pReq->deleteMark = convertTimePrecision(tsmaDataDeleteMark, TSDB_TIME_PRECISION_MILLI, pTableMeta->tableInfo.precision);
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}

View File

@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskBackupCheckpoint(char* id, char* path);
int32_t downloadCheckpoint(char* id, char* path);
int32_t deleteCheckpoint(char* id);
int32_t deleteCheckpointFile(char* id, char* name);
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -175,7 +175,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
stDebug("s-task:%s start check-rsp monit, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
@ -194,7 +194,7 @@ int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check rsp mon", id);
stDebug("s-task:%s set stop check-rsp monit", id);
return TSDB_CODE_SUCCESS;
}
@ -272,6 +272,8 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
}
pInfo->startTs = startTs;
pInfo->timeoutStartTs = startTs;
pInfo->stopCheckProcess = 0;
return TSDB_CODE_SUCCESS;
}
@ -293,7 +295,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
@ -329,7 +331,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
return TSDB_CODE_FAILED;
}
stDebug("s-task:%s set the in-check-procedure flag", id);
stDebug("s-task:%s set the in check-rsp flag", id);
return TSDB_CODE_SUCCESS;
}
@ -343,9 +345,10 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
}
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->timeoutStartTs = 0;
pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0;
@ -458,6 +461,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
ASSERT(pTask->status.downstreamReady == 0);
pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
@ -488,7 +492,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
}
}
@ -517,6 +521,30 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
}
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a dead lock.
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1;
}
stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
return 0;
}
// this function is executed in timer thread
void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta;
@ -524,7 +552,7 @@ void rspMonitorFn(void* param, void* tmrId) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0;
@ -541,12 +569,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
return;
@ -577,7 +600,7 @@ void rspMonitorFn(void* param, void* tmrId) {
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
@ -614,13 +637,7 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList);
@ -639,8 +656,10 @@ void rspMonitorFn(void* param, void* tmrId) {
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
stDebug(
"s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
"ready:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);

View File

@ -26,6 +26,9 @@ typedef struct {
SStreamTask* pTask;
} SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(char* id, char* name);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -376,21 +379,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code;
}
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
int32_t code = downloadCheckpointByName(id, "META", file);
int32_t code = downloadCheckpointDataByName(id, "META", file);
if (code != 0) {
stDebug("chkp failed to download meta file:%s", file);
taosMemoryFree(file);
return code;
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
@ -428,7 +433,7 @@ int32_t uploadCheckpointData(void* param) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
}
if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId);
}
}
@ -457,8 +462,7 @@ int32_t uploadCheckpointData(void* param) {
return code;
}
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) {
return 0;
@ -514,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadChkp(pTask, ckId, (char*)id);
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
}
@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}
@ -585,12 +589,12 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
stDebug("[s3] upload checkpoint:%s", filename);
// break;
}
taosCloseDir(&pDir);
taosCloseDir(&pDir);
return 0;
}
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname);
@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) {
}
// fileName: CURRENT
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return 0;
} else if (tsS3StreamEnabled) {
return downloadCheckpointByNameS3(id, fname, dstName);
}
return 0;
}
@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) {
stError("downloadCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path);
} else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path);
}
return 0;
}

View File

@ -541,7 +541,7 @@ static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastEx
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
int32_t doStreamExecTask(SStreamTask* pTask) {
static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.

View File

@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
}
@ -1707,3 +1707,38 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
return 0;
}
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}

View File

@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
}
@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);

View File

@ -63,6 +63,7 @@ typedef struct {
bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
bool (*noDelayFp)(tmsg_t msgType);
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock

View File

@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->noDelayFp = pInit->noDelayFp;
pRpc->connLimitNum = pInit->connLimitNum;
if (pRpc->connLimitNum == 0) {
pRpc->connLimitNum = 20;

View File

@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) {
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd);
doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS);
}
taosMemoryFree(msglist);
@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId;
if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) {
tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pMsg = NULL;
return NULL;
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL;
} else {
@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) {
}
}
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0};
transMsg.contLen = 0;
transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
transMsg.code = code;
transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId;
@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) {
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst;
int32_t code = TSDB_CODE_RPC_MAX_SESSIONS;
QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd);
doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg);
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {

View File

@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")

View File

@ -103,7 +103,7 @@ class TDTestCase(TBase):
loop = 0
rets = []
overCnt = 0
while loop < 100:
while loop < 200:
time.sleep(3)
# check upload to s3

View File

@ -381,6 +381,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29793.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py

View File

@ -51,6 +51,8 @@ class TDSql:
def init(self, cursor, log=True):
self.cursor = cursor
self.sql = None
print(f"sqllog is :{log}")
if (log):
caller = inspect.getframeinfo(inspect.stack()[1][0])

View File

@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(251, 252))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))

View File

@ -0,0 +1,88 @@
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.csv import *
import os
import taos
import json
from taos import SmlProtocol, SmlPrecision
from taos.error import SchemalessError
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), True)
def run(self):
conn = taos.connect()
conn.execute("drop database if exists reproduce")
conn.execute("CREATE DATABASE reproduce")
conn.execute("USE reproduce")
# influxDB
conn.execute("drop table if exists meters")
lines1 = ["meters,location=California.LosAngeles groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249000",]
lines2 = ["meters,location=California.LosAngeles,groupid=2 groupid=2,current=11i32,voltage=221,phase=0.28 1648432611249001",]
lines3 = ["meters,location=California.LosAngeles,groupid=2 current=11i32,voltage=221,phase=0.28 1648432611249002",]
try:
conn.schemaless_insert(lines1, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
conn.schemaless_insert(lines2, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
conn.schemaless_insert(lines3, SmlProtocol.LINE_PROTOCOL, SmlPrecision.MICRO_SECONDS)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
# OpenTSDB
conn.execute("drop table if exists meters")
lines1 = ["meters 1648432611249 10i32 location=California.SanFrancisco groupid=2 groupid=3",]
lines2 = ["meters 1648432611250 10i32 groupid=2 location=California.SanFrancisco groupid=3",]
try:
conn.schemaless_insert(lines1, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
conn.schemaless_insert(lines2, SmlProtocol.TELNET_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
tdSql.checkEqual('expected error', 'no error occurred')
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
# OpenTSDB Json
conn.execute("drop table if exists meters")
lines1 = [{"metric": "meters", "timestamp": 1648432611249, "value": "a32", "tags": {"location": "California.SanFrancisco", "groupid": 2, "groupid": 3}}]
lines2 = [{"metric": "meters", "timestamp": 1648432611250, "value": "a32", "tags": {"groupid": 2, "location": "California.SanFrancisco", "groupid": 4}}]
try:
lines = json.dumps(lines1)
conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
# tdSql.checkEqual('expected error', 'no error occurred') TD-29850
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
try:
lines = json.dumps(lines2)
conn.schemaless_insert([lines], SmlProtocol.JSON_PROTOCOL, SmlPrecision.NOT_CONFIGURED)
# tdSql.checkEqual('expected error', 'no error occurred') TD-29850
except SchemalessError as errMsg:
tdSql.checkEqual(errMsg.msg, 'Duplicated column names')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -11,6 +11,8 @@ from util.common import *
ROUND = 1000
ignore_some_tests: int = 1
class TSMA:
def __init__(self):
self.tsma_name = ''
@ -601,7 +603,7 @@ class TSMATestSQLGenerator:
class TDTestCase:
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 8}
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3}
def __init__(self):
self.vgroups = 4
@ -802,6 +804,7 @@ class TDTestCase:
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
if not ignore_some_tests:
self.test_recursive_tsma()
self.test_query_interval_sliding()
self.test_union()
@ -812,6 +815,7 @@ class TDTestCase:
self.test_add_tag_col()
self.test_modify_col_name_value()
self.test_alter_tag_val()
if not ignore_some_tests:
self.test_ins_tsma()
def test_ins_tsma(self):
@ -862,7 +866,7 @@ class TDTestCase:
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999')
.should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.800').get_qc())
self.check(ctxs)
if not ignore_some_tests:
tdSql.execute('create database db2')
tdSql.execute('use db2')
tdSql.execute('create table db2.norm_tb(ts timestamp, c2 int)')
@ -898,8 +902,11 @@ class TDTestCase:
return result_str
def test_long_tsma_name(self):
self.drop_tsma('tsma2', 'test')
name = self.generate_random_string(178)
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)', 'last(ts)']
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'last(ts)']
if not ignore_some_tests:
tsma_func_list.append('hyperloglog(c2)')
self.create_tsma(name, 'test', 'meters', tsma_func_list, '55m')
sql = 'select last(c5), spread(c2) from test.meters interval(55m)'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(name).get_qc()
@ -953,7 +960,9 @@ class TDTestCase:
def test_recursive_tsma(self):
tdSql.execute('drop tsma test.tsma2')
tsma_func_list = ['last(ts)', 'avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)']
tsma_func_list = ['last(ts)', 'avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)']
if not ignore_some_tests:
tsma_func_list.append('hyperloglog(c2)')
select_func_list: List[str] = tsma_func_list.copy()
select_func_list.append('count(*)')
self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m')
@ -1389,6 +1398,8 @@ class TDTestCase:
tdSql.error(
'create tsma tsma_error_interval on nsdb.meters function(count(c2)) interval(10s) sliding(1m)')
if not ignore_some_tests:
# max tsma num 8
self.create_tsma('tsma2', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '10s')
self.create_tsma('tsma_test3', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '100s')
@ -1525,6 +1536,7 @@ class TDTestCase:
tdSql.error("CREATE TSMA T*\-sma129_ ON test.meters FUNCTION(count(c1)) INTERVAL(5m); ", -2147473920)
tdSql.error("CREATE TSMA Tsma_repeat ON test.meters FUNCTION(count(c1)) INTERVAL(5m); ", -2147482496)
self.drop_tsma('tsma_repeat', 'test')
# tsma name include escape character
tdSql.execute("CREATE TSMA `129_tsma` ON test.meters FUNCTION(count(c3)) INTERVAL(5m); ")
@ -1534,9 +1546,6 @@ class TDTestCase:
tdSql.execute("drop tsma test.`129_Tsma`")
tdSql.execute("drop tsma test.`129_T*\-sma`")
self.drop_tsma('tsma_repeat', 'test')
def test_create_and_drop_tsma(self, tsma_name: str, db_name: str = 'test', table_name: str = 'meters', func_list: List = ['avg(c1)', 'avg(c2)'], interval: str = '5m'):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')

View File

@ -38,6 +38,14 @@ class TDTestCase:
tdSql.execute("create topic topic_2 with meta as stable db1.st")
tdSql.execute("create topic topic_3 as select * from db1.nt")
tdSql.execute("create topic topic_4 as select ts,c3,c5,t2 from db1.st")
for i in range(5, 21):
tdSql.execute(f"create topic topic_{i} as select ts,c3,c5,t2 from db1.st")
tdSql.error("create topic topic_21 as select * from db1.nt")
tdSql.execute("create topic if not exists topic_1 as database db1")
for i in range(5, 21):
tdSql.execute(f"drop topic topic_{i}")
tdSql.query("select * from information_schema.ins_topics order by topic_name")
tdSql.checkRows(4)