Merge branch '3.0' into fix/TD-19468-2
This commit is contained in:
commit
c84618ea3f
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG cc973e0
|
GIT_TAG bc99376
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -67,7 +67,7 @@ The following return value results indicate that the verification passed.
|
||||||
## HTTP request URL format
|
## HTTP request URL format
|
||||||
|
|
||||||
```text
|
```text
|
||||||
http://<fqdn>:<port>/rest/sql/[db_name]
|
http://<fqdn>:<port>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
Parameter Description:
|
Parameter Description:
|
||||||
|
@ -75,6 +75,7 @@ Parameter Description:
|
||||||
- fqnd: FQDN or IP address of any host in the cluster.
|
- fqnd: FQDN or IP address of any host in the cluster.
|
||||||
- port: httpPort configuration item in the configuration file, default is 6041.
|
- port: httpPort configuration item in the configuration file, default is 6041.
|
||||||
- db_name: Optional parameter that specifies the default database name for the executed SQL command.
|
- db_name: Optional parameter that specifies the default database name for the executed SQL command.
|
||||||
|
- tz: Optional parameter that specifies the timezone of the returned time, following the IANA Time Zone rules, e.g. `America/New_York`.
|
||||||
|
|
||||||
For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`.
|
For example, `http://h1.taos.com:6041/rest/sql/test` is a URL to `h1.taos.com:6041` and sets the default database name to `test`.
|
||||||
|
|
||||||
|
@ -97,13 +98,13 @@ The HTTP request's BODY is a complete SQL command, and the data table in the SQL
|
||||||
Use `curl` to initiate an HTTP request with a custom authentication method, with the following syntax.
|
Use `curl` to initiate an HTTP request with a custom authentication method, with the following syntax.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
or
|
or
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
where `TOKEN` is the string after Base64 encoding of `{username}:{password}`, e.g. `root:taosdata` is encoded as `cm9vdDp0YW9zZGF0YQ==`..
|
where `TOKEN` is the string after Base64 encoding of `{username}:{password}`, e.g. `root:taosdata` is encoded as `cm9vdDp0YW9zZGF0YQ==`..
|
||||||
|
|
|
@ -69,7 +69,7 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \
|
||||||
## HTTP 请求格式
|
## HTTP 请求格式
|
||||||
|
|
||||||
```text
|
```text
|
||||||
http://<fqdn>:<port>/rest/sql/[db_name]
|
http://<fqdn>:<port>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
参数说明:
|
参数说明:
|
||||||
|
@ -77,6 +77,7 @@ http://<fqdn>:<port>/rest/sql/[db_name]
|
||||||
- fqdn: 集群中的任一台主机 FQDN 或 IP 地址。
|
- fqdn: 集群中的任一台主机 FQDN 或 IP 地址。
|
||||||
- port: 配置文件中 httpPort 配置项,缺省为 6041。
|
- port: 配置文件中 httpPort 配置项,缺省为 6041。
|
||||||
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
|
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
|
||||||
|
- tz: 可选参数,指定返回时间的时区,遵照 IANA Time Zone 规则,如 `America/New_York`。
|
||||||
|
|
||||||
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
|
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
|
||||||
|
|
||||||
|
@ -99,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据
|
||||||
使用 `curl` 通过自定义身份认证方式来发起一个 HTTP Request,语法如下:
|
使用 `curl` 通过自定义身份认证方式来发起一个 HTTP Request,语法如下:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
或者,
|
或者,
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timezone]
|
||||||
```
|
```
|
||||||
|
|
||||||
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`。
|
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`。
|
||||||
|
|
|
@ -164,9 +164,9 @@
|
||||||
#define TK_SCORES 146
|
#define TK_SCORES 146
|
||||||
#define TK_TOPICS 147
|
#define TK_TOPICS 147
|
||||||
#define TK_VARIABLES 148
|
#define TK_VARIABLES 148
|
||||||
#define TK_BNODES 149
|
#define TK_CLUSTER 149
|
||||||
#define TK_SNODES 150
|
#define TK_BNODES 150
|
||||||
#define TK_CLUSTER 151
|
#define TK_SNODES 151
|
||||||
#define TK_TRANSACTIONS 152
|
#define TK_TRANSACTIONS 152
|
||||||
#define TK_DISTRIBUTED 153
|
#define TK_DISTRIBUTED 153
|
||||||
#define TK_CONSUMERS 154
|
#define TK_CONSUMERS 154
|
||||||
|
|
|
@ -290,6 +290,7 @@ typedef struct {
|
||||||
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||||
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
|
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
|
||||||
|
|
||||||
|
#if 0
|
||||||
// TODO remove this function
|
// TODO remove this function
|
||||||
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct tDataTypeDescriptor {
|
typedef struct tDataTypeDescriptor {
|
||||||
int16_t type;
|
int16_t type;
|
||||||
|
|
|
@ -274,6 +274,7 @@ typedef struct SShowTableDistributedStmt {
|
||||||
typedef struct SShowDnodeVariablesStmt {
|
typedef struct SShowDnodeVariablesStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
SNode* pDnodeId;
|
SNode* pDnodeId;
|
||||||
|
SNode* pLikePattern;
|
||||||
} SShowDnodeVariablesStmt;
|
} SShowDnodeVariablesStmt;
|
||||||
|
|
||||||
typedef struct SShowVnodesStmt {
|
typedef struct SShowVnodesStmt {
|
||||||
|
|
|
@ -347,11 +347,6 @@ typedef struct SInsertStmt {
|
||||||
uint8_t precision;
|
uint8_t precision;
|
||||||
} SInsertStmt;
|
} SInsertStmt;
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
PAYLOAD_TYPE_KV = 0,
|
|
||||||
PAYLOAD_TYPE_RAW = 1,
|
|
||||||
} EPayloadType;
|
|
||||||
|
|
||||||
typedef struct SVgDataBlocks {
|
typedef struct SVgDataBlocks {
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
int32_t numOfTables; // number of tables in current submit block
|
int32_t numOfTables; // number of tables in current submit block
|
||||||
|
@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt {
|
||||||
ENodeType nodeType;
|
ENodeType nodeType;
|
||||||
ENodeType sqlNodeType;
|
ENodeType sqlNodeType;
|
||||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
|
||||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||||
const char* sql; // current sql statement position
|
const char* sql; // current sql statement position
|
||||||
} SVnodeModifOpStmt;
|
} SVnodeModifOpStmt;
|
||||||
|
|
|
@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
|
@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
|
||||||
|
|
||||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||||
|
@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
|
||||||
|
|
||||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||||
|
|
|
@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
|
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
|
||||||
|
|
||||||
if (pTscObj) {
|
if (pTscObj) {
|
||||||
info->taos = pTscObj;
|
info->taos = pTscObj;
|
||||||
|
|
|
@ -230,8 +230,8 @@ static const SSysDbTableSchema transSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema configSchema[] = {
|
static const SSysDbTableSchema configSchema[] = {
|
||||||
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema variablesSchema[] = {
|
static const SSysDbTableSchema variablesSchema[] = {
|
||||||
|
@ -282,7 +282,7 @@ static const SSysTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USERS, userUsersSchema, tListLen(userUsersSchema), false},
|
{TSDB_INS_TABLE_USERS, userUsersSchema, tListLen(userUsersSchema), false},
|
||||||
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema), true},
|
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema), true},
|
||||||
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema), true},
|
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema), true},
|
||||||
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema), true},
|
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema), false},
|
||||||
{TSDB_INS_TABLE_DNODE_VARIABLES, variablesSchema, tListLen(variablesSchema), true},
|
{TSDB_INS_TABLE_DNODE_VARIABLES, variablesSchema, tListLen(variablesSchema), true},
|
||||||
{TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false},
|
{TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false},
|
||||||
{TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false},
|
{TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false},
|
||||||
|
|
|
@ -5158,7 +5158,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
|
||||||
char name[TSDB_COL_NAME_LEN] = {0};
|
char name[TSDB_COL_NAME_LEN] = {0};
|
||||||
char *tmp = NULL;
|
char *tmp = NULL;
|
||||||
if (tDecodeCStr(pCoder, &tmp) < 0) return -1;
|
if (tDecodeCStr(pCoder, &tmp) < 0) return -1;
|
||||||
strcpy(name, tmp);
|
strncpy(name, tmp, TSDB_COL_NAME_LEN - 1);
|
||||||
taosArrayPush(pReq->ctb.tagName, name);
|
taosArrayPush(pReq->ctb.tagName, name);
|
||||||
}
|
}
|
||||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||||
|
|
|
@ -187,7 +187,9 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
if (pTSBuf->autoDelete) {
|
if (pTSBuf->autoDelete) {
|
||||||
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
|
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
|
||||||
taosRemoveFile(pTSBuf->path);
|
if (taosRemoveFile(pTSBuf->path) != 0) {
|
||||||
|
// tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
|
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
|
||||||
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
|
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
bool first = true;
|
|
||||||
int32_t valid = 0;
|
int32_t valid = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
void *entryKey = NULL;
|
void *entryKey = NULL;
|
||||||
|
@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
|
||||||
|
|
||||||
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
|
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
|
||||||
if (cmp == 0) taosArrayPush(pUids, &p->uid);
|
if (cmp == 0) taosArrayPush(pUids, &p->uid);
|
||||||
if (cmp == -1) break;
|
|
||||||
|
if (param->reverse == false) {
|
||||||
|
if (cmp == -1) break;
|
||||||
|
} else if (param->reverse) {
|
||||||
|
if (cmp == 1) break;
|
||||||
|
}
|
||||||
|
|
||||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||||
if (valid < 0) break;
|
if (valid < 0) break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
|
static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
|
||||||
ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
|
if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
ncolKey->uid = pME->uid;
|
ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
|
||||||
|
ncolKey->uid = pME->uid;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -777,9 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// search the column to add/drop/update
|
// search the column to add/drop/update
|
||||||
pSchema = &entry.ntbEntry.schemaRow;
|
pSchema = &entry.ntbEntry.schemaRow;
|
||||||
|
|
||||||
|
// save old entry
|
||||||
|
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
|
||||||
|
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
|
||||||
|
|
||||||
int32_t iCol = 0;
|
int32_t iCol = 0;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
pColumn = NULL;
|
pColumn = NULL;
|
||||||
|
@ -872,6 +880,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
|
|
||||||
entry.version = version;
|
entry.version = version;
|
||||||
|
|
||||||
|
metaDeleteNcolIdx(pMeta, &oldEntry);
|
||||||
|
metaUpdateNcolIdx(pMeta, &entry);
|
||||||
|
|
||||||
// do actual write
|
// do actual write
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ typedef struct SDataBlockIter {
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||||
int32_t order;
|
int32_t order;
|
||||||
SDataBlk block; // current SDataBlk data
|
SDataBlk block; // current SDataBlk data
|
||||||
SHashObj* pTableMap;
|
SHashObj* pTableMap;
|
||||||
} SDataBlockIter;
|
} SDataBlockIter;
|
||||||
|
|
||||||
|
@ -1209,19 +1209,19 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
||||||
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
|
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
||||||
int32_t* nextIndex, int32_t order) {
|
int32_t* nextIndex, int32_t order) {
|
||||||
bool asc = ASCENDING_TRAVERSE(order);
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
|
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!asc && pFBlockInfo->tbBlockIdx == 0) {
|
if (!asc && pBlockInfo->tbBlockIdx == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||||
|
|
||||||
SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
|
SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
|
||||||
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||||
|
@ -1631,7 +1631,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3768,6 +3768,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
|
||||||
|
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
|
||||||
|
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
||||||
ASSERT(pDataBlockInfo != NULL && pReader != NULL);
|
ASSERT(pDataBlockInfo != NULL && pReader != NULL);
|
||||||
pDataBlockInfo->rows = pReader->pResBlock->info.rows;
|
pDataBlockInfo->rows = pReader->pResBlock->info.rows;
|
||||||
|
|
|
@ -280,8 +280,8 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
||||||
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchInfo));
|
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
|
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
|
||||||
|
|
|
@ -1073,7 +1073,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
|
|
||||||
pMatchInfo->matchType = type;
|
pMatchInfo->matchType = type;
|
||||||
|
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -2881,7 +2881,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
|
||||||
default:
|
default:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 1;
|
return cmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) {
|
static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) {
|
||||||
|
@ -2987,10 +2987,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
|
||||||
.val = pVal->datum.p,
|
.val = pVal->datum.p,
|
||||||
.reverse = reverse,
|
.reverse = reverse,
|
||||||
.filterFunc = func};
|
.filterFunc = func};
|
||||||
|
|
||||||
int32_t ret = metaFilterCreateTime(pMeta, ¶m, result);
|
|
||||||
if (ret == 0) return 0;
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3002,15 +2998,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
|
||||||
bool reverse = false;
|
bool reverse = false;
|
||||||
|
|
||||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||||
SMetaFltParam param = {.suid = 0,
|
|
||||||
.cid = 0,
|
|
||||||
.type = TSDB_DATA_TYPE_BIGINT,
|
|
||||||
.val = &pVal->datum.i,
|
|
||||||
.reverse = reverse,
|
|
||||||
.filterFunc = func};
|
|
||||||
int32_t ret = metaFilterCreateTime(pMeta, ¶m, result);
|
|
||||||
if (func == NULL) return -1;
|
if (func == NULL) return -1;
|
||||||
return 0;
|
|
||||||
|
SMetaFltParam param = {.suid = 0,
|
||||||
|
.cid = 0,
|
||||||
|
.type = TSDB_DATA_TYPE_BIGINT,
|
||||||
|
.val = &pVal->datum.i,
|
||||||
|
.reverse = reverse,
|
||||||
|
.filterFunc = func};
|
||||||
|
|
||||||
|
int32_t ret = metaFilterCreateTime(pMeta, ¶m, result);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) {
|
static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) {
|
||||||
void* pMeta = ((SSTabFltArg*)arg)->pMeta;
|
void* pMeta = ((SSTabFltArg*)arg)->pMeta;
|
||||||
|
@ -3073,7 +3071,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) {
|
||||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||||
EOperatorType opType = pOper->opType;
|
EOperatorType opType = pOper->opType;
|
||||||
if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN &&
|
if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN &&
|
||||||
OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) {
|
opType != OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -209,7 +209,7 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
|
||||||
return numOfSpaces;
|
return numOfSpaces;
|
||||||
}
|
}
|
||||||
|
|
||||||
void static addTimezoneParam(SNodeList* pList) {
|
static int32_t addTimezoneParam(SNodeList* pList) {
|
||||||
char buf[6] = {0};
|
char buf[6] = {0};
|
||||||
time_t t = taosTime(NULL);
|
time_t t = taosTime(NULL);
|
||||||
struct tm tmInfo;
|
struct tm tmInfo;
|
||||||
|
@ -218,6 +218,10 @@ void static addTimezoneParam(SNodeList* pList) {
|
||||||
int32_t len = (int32_t)strlen(buf);
|
int32_t len = (int32_t)strlen(buf);
|
||||||
|
|
||||||
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (pVal == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pVal->literal = strndup(buf, len);
|
pVal->literal = strndup(buf, len);
|
||||||
pVal->isDuration = false;
|
pVal->isDuration = false;
|
||||||
pVal->translate = true;
|
pVal->translate = true;
|
||||||
|
@ -229,10 +233,15 @@ void static addTimezoneParam(SNodeList* pList) {
|
||||||
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
|
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
|
||||||
|
|
||||||
nodesListAppend(pList, (SNode*)pVal);
|
nodesListAppend(pList, (SNode*)pVal);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
|
static int32_t addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
|
||||||
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (pVal == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pVal->literal = NULL;
|
pVal->literal = NULL;
|
||||||
pVal->isDuration = false;
|
pVal->isDuration = false;
|
||||||
pVal->translate = true;
|
pVal->translate = true;
|
||||||
|
@ -244,6 +253,7 @@ void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
|
||||||
pVal->typeData = (int64_t)precision;
|
pVal->typeData = (int64_t)precision;
|
||||||
|
|
||||||
nodesListMakeAppend(pList, (SNode*)pVal);
|
nodesListMakeAppend(pList, (SNode*)pVal);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// There is only one parameter of numeric type, and the return type is parameter type
|
// There is only one parameter of numeric type, and the return type is parameter type
|
||||||
|
@ -465,7 +475,10 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
|
|
||||||
// add database precision as param
|
// add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1487,7 +1500,10 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
|
|
||||||
// add database precision as param
|
// add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1810,7 +1826,10 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
|
||||||
// add database precision as param
|
// add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1844,7 +1863,10 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format");
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format");
|
||||||
}
|
}
|
||||||
} else { // add default client timezone
|
} else { // add default client timezone
|
||||||
addTimezoneParam(pFunc->pParameterList);
|
int32_t code = addTimezoneParam(pFunc->pParameterList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set result type
|
// set result type
|
||||||
|
@ -1863,7 +1885,10 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
|
||||||
|
|
||||||
// add database precision as param
|
// add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1894,7 +1919,10 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
|
||||||
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
|
||||||
}
|
}
|
||||||
|
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pFunc->node.resType =
|
pFunc->node.resType =
|
||||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
|
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
|
@ -1935,7 +1963,10 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1861,7 +1861,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->usum += plist[i];
|
pStddevRes->usum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticUSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -1877,7 +1877,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->usum += plist[i];
|
pStddevRes->usum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticUSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1892,7 +1892,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->usum += plist[i];
|
pStddevRes->usum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticUSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -1908,7 +1908,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->usum += plist[i];
|
pStddevRes->usum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticUSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -5359,7 +5359,7 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
int32_t currentRow = pBlock->info.rows;
|
int32_t currentRow = pBlock->info.rows;
|
||||||
|
|
||||||
int32_t resIndex;
|
int32_t resIndex = -1;
|
||||||
int32_t maxCount = 0;
|
int32_t maxCount = 0;
|
||||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
||||||
|
@ -5369,8 +5369,12 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
if (maxCount != 0) {
|
||||||
colDataAppend(pCol, currentRow, pResItem->data, (maxCount == 0) ? true : false);
|
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
||||||
|
colDataAppend(pCol, currentRow, pResItem->data, false);
|
||||||
|
} else {
|
||||||
|
colDataAppendNULL(pCol, currentRow);
|
||||||
|
}
|
||||||
|
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -495,7 +495,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
|
||||||
|
|
||||||
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
|
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
|
||||||
SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
|
SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||||
assert(list->size > 0);
|
ASSERT(list != NULL && list->size > 0);
|
||||||
|
|
||||||
for (int32_t f = 0; f < list->size; ++f) {
|
for (int32_t f = 0; f < list->size; ++f) {
|
||||||
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
|
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
|
||||||
|
|
|
@ -203,7 +203,6 @@ int32_t nodesReleaseAllocator(int64_t allocatorId) {
|
||||||
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
|
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
|
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
|
||||||
if (allocatorId <= 0) {
|
if (allocatorId <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -936,6 +935,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
}
|
}
|
||||||
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
|
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
|
||||||
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pDnodeId);
|
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pDnodeId);
|
||||||
|
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pLikePattern);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||||
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
|
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
|
||||||
|
|
|
@ -176,7 +176,7 @@ SNode* createShowStmtWithCond(SAstCreateContext* pCxt, ENodeType type, SNode* pD
|
||||||
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||||
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable);
|
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable);
|
||||||
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
||||||
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId);
|
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pLikePattern);
|
||||||
SNode* createShowVnodesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pDnodeEndpoint);
|
SNode* createShowVnodesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pDnodeEndpoint);
|
||||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo);
|
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo);
|
||||||
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
|
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
|
||||||
|
|
|
@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC
|
||||||
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
|
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||||
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
|
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
|
||||||
SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
|
SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
|
||||||
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
|
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
|
||||||
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
|
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
|
||||||
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
|
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
|
||||||
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
|
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
|
||||||
|
|
|
@ -410,8 +410,9 @@ cmd ::= SHOW QUERIES.
|
||||||
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT); }
|
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT); }
|
||||||
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT); }
|
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT); }
|
||||||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLES_STMT); }
|
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLES_STMT); }
|
||||||
|
cmd ::= SHOW CLUSTER VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLES_STMT); }
|
||||||
cmd ::= SHOW LOCAL VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT); }
|
cmd ::= SHOW LOCAL VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT); }
|
||||||
cmd ::= SHOW DNODE NK_INTEGER(A) VARIABLES. { pCxt->pRootNode = createShowDnodeVariablesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A)); }
|
cmd ::= SHOW DNODE NK_INTEGER(A) VARIABLES like_pattern_opt(B). { pCxt->pRootNode = createShowDnodeVariablesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), B); }
|
||||||
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT); }
|
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT); }
|
||||||
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); }
|
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); }
|
||||||
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); }
|
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); }
|
||||||
|
|
|
@ -1372,11 +1372,12 @@ SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId) {
|
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pLikePattern) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SShowDnodeVariablesStmt* pStmt = (SShowDnodeVariablesStmt*)nodesMakeNode(QUERY_NODE_SHOW_DNODE_VARIABLES_STMT);
|
SShowDnodeVariablesStmt* pStmt = (SShowDnodeVariablesStmt*)nodesMakeNode(QUERY_NODE_SHOW_DNODE_VARIABLES_STMT);
|
||||||
CHECK_OUT_OF_MEM(pStmt);
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
pStmt->pDnodeId = pDnodeId;
|
pStmt->pDnodeId = pDnodeId;
|
||||||
|
pStmt->pLikePattern = pLikePattern;
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -484,11 +484,6 @@ static int32_t collectMetaKeyFromShowQueries(SCollectMetaKeyCxt* pCxt, SShowStmt
|
||||||
pCxt->pMetaCache);
|
pCxt->pMetaCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t collectMetaKeyFromShowConfigs(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
|
||||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CONFIGS,
|
|
||||||
pCxt->pMetaCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t collectMetaKeyFromShowVariables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
static int32_t collectMetaKeyFromShowVariables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CONFIGS,
|
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CONFIGS,
|
||||||
pCxt->pMetaCache);
|
pCxt->pMetaCache);
|
||||||
|
|
|
@ -125,7 +125,6 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
|
||||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
case QUERY_NODE_SHOW_LICENCES_STMT:
|
case QUERY_NODE_SHOW_LICENCES_STMT:
|
||||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||||
case QUERY_NODE_SHOW_VARIABLES_STMT:
|
|
||||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||||
case QUERY_NODE_SHOW_VNODES_STMT:
|
case QUERY_NODE_SHOW_VNODES_STMT:
|
||||||
|
|
|
@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
|
|
||||||
// merge according to vgId
|
// merge according to vgId
|
||||||
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
|
||||||
}
|
}
|
||||||
return insBuildOutput(pCxt);
|
return insBuildOutput(pCxt);
|
||||||
}
|
}
|
||||||
|
@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) {
|
||||||
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
||||||
// merge according to vgId
|
// merge according to vgId
|
||||||
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
|
||||||
}
|
}
|
||||||
return insBuildOutput(pCxt);
|
return insBuildOutput(pCxt);
|
||||||
}
|
}
|
||||||
|
@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (!context.pComCxt->needMultiParse) {
|
if (!context.pComCxt->needMultiParse) {
|
||||||
code = skipInsertInto(&context.pSql, &context.msg);
|
code = skipInsertInto(&context.pSql, &context.msg);
|
||||||
|
|
|
@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
|
||||||
|
|
||||||
// merge according to vgId
|
// merge according to vgId
|
||||||
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
|
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
|
||||||
CHECK_CODE(
|
CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks));
|
||||||
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CHECK_CODE(insBuildOutput(&insertCtx));
|
CHECK_CODE(insBuildOutput(&insertCtx));
|
||||||
|
|
|
@ -21,9 +21,6 @@
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "tRealloc.h"
|
#include "tRealloc.h"
|
||||||
|
|
||||||
#define IS_RAW_PAYLOAD(t) \
|
|
||||||
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
|
||||||
|
|
||||||
typedef struct SBlockKeyTuple {
|
typedef struct SBlockKeyTuple {
|
||||||
TSKEY skey;
|
TSKEY skey;
|
||||||
void* payloadAddr;
|
void* payloadAddr;
|
||||||
|
@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||||
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
|
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
|
||||||
int32_t columns = getNumOfColumns(pTableMeta);
|
int32_t columns = getNumOfColumns(pTableMeta);
|
||||||
|
@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||||
result += (int32_t)TD_BITMAP_BYTES(columns - 1);
|
result += (int32_t)TD_BITMAP_BYTES(columns - 1);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void insDestroyBlockArrayList(SArray* pDataBlockList) {
|
void insDestroyBlockArrayList(SArray* pDataBlockList) {
|
||||||
if (pDataBlockList == NULL) {
|
if (pDataBlockList == NULL) {
|
||||||
|
@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
|
||||||
taosHashCleanup(pDataBlockHash);
|
taosHashCleanup(pDataBlockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
// data block is disordered, sort it in ascending order
|
// data block is disordered, sort it in ascending order
|
||||||
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
|
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
|
||||||
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
|
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
|
||||||
|
@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
|
||||||
|
|
||||||
dataBuf->prevTS = INT64_MIN;
|
dataBuf->prevTS = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// data block is disordered, sort it in ascending order
|
// data block is disordered, sort it in ascending order
|
||||||
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
|
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
|
||||||
|
@ -667,68 +667,31 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
|
||||||
}
|
}
|
||||||
|
|
||||||
// Erase the empty space reserved for binary data
|
// Erase the empty space reserved for binary data
|
||||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
|
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
|
||||||
bool isRawPayload) {
|
|
||||||
// TODO: optimize this function, handle the case while binary is not presented
|
// TODO: optimize this function, handle the case while binary is not presented
|
||||||
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
|
||||||
STableComInfo tinfo = getTableInfo(pTableMeta);
|
|
||||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
|
||||||
|
|
||||||
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
|
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
|
||||||
SSubmitBlk* pBlock = pDataBlock;
|
SSubmitBlk* pBlock = pDataBlock;
|
||||||
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
|
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
|
||||||
pDataBlock = (char*)pDataBlock + nonDataLen;
|
pDataBlock = (char*)pDataBlock + nonDataLen;
|
||||||
|
|
||||||
int32_t flen = 0; // original total length of row
|
|
||||||
if (isRawPayload) {
|
|
||||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
|
||||||
flen += TYPE_BYTES[pSchema[j].type];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pBlock->schemaLen = pTableDataBlock->createTbReqLen;
|
pBlock->schemaLen = pTableDataBlock->createTbReqLen;
|
||||||
|
|
||||||
char* p = pTableDataBlock->pData + nonDataLen;
|
|
||||||
pBlock->dataLen = 0;
|
pBlock->dataLen = 0;
|
||||||
|
|
||||||
int32_t numOfRows = pBlock->numOfRows;
|
int32_t numOfRows = pBlock->numOfRows;
|
||||||
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
if (isRawPayload) {
|
void* payload = (blkKeyTuple + i)->payloadAddr;
|
||||||
SRowBuilder builder = {0};
|
TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
|
||||||
|
memcpy(pDataBlock, payload, rowTLen);
|
||||||
tdSRowInit(&builder, pTableMeta->sversion);
|
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||||
tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
|
pBlock->dataLen += rowTLen;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
tdSRowResetBuf(&builder, pDataBlock);
|
|
||||||
int toffset = 0;
|
|
||||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
|
||||||
int8_t colType = pSchema[j].type;
|
|
||||||
uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
|
|
||||||
tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
|
|
||||||
toffset += TYPE_BYTES[colType];
|
|
||||||
p += pSchema[j].bytes;
|
|
||||||
}
|
|
||||||
tdSRowEnd(&builder);
|
|
||||||
int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
|
|
||||||
pDataBlock = (char*)pDataBlock + rowLen;
|
|
||||||
pBlock->dataLen += rowLen;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
|
||||||
void* payload = (blkKeyTuple + i)->payloadAddr;
|
|
||||||
TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
|
|
||||||
memcpy(pDataBlock, payload, rowTLen);
|
|
||||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
|
||||||
pBlock->dataLen += rowTLen;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock->dataLen + pBlock->schemaLen;
|
return pBlock->dataLen + pBlock->schemaLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
|
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
|
||||||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||||
int code = 0;
|
int code = 0;
|
||||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
|
||||||
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
|
||||||
}
|
}
|
||||||
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
|
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
|
||||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
int64_t destSize = dataBuf->size + pOneTableBlock->size +
|
||||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
|
||||||
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
|
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
|
||||||
pOneTableBlock->createTbReqLen;
|
pOneTableBlock->createTbReqLen;
|
||||||
|
|
||||||
|
@ -774,23 +736,18 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isRawPayload) {
|
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
|
||||||
sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
|
tdFreeSBlockRowMerger(pBlkRowMerger);
|
||||||
} else {
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
|
insDestroyBlockArrayList(pVnodeDataBlockList);
|
||||||
tdFreeSBlockRowMerger(pBlkRowMerger);
|
taosMemoryFreeClear(dataBuf->pData);
|
||||||
taosHashCleanup(pVnodeDataBlockHashList);
|
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
|
||||||
insDestroyBlockArrayList(pVnodeDataBlockList);
|
return code;
|
||||||
taosMemoryFreeClear(dataBuf->pData);
|
|
||||||
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
|
|
||||||
}
|
}
|
||||||
|
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
|
||||||
|
|
||||||
// erase the empty space reserved for binary data
|
// erase the empty space reserved for binary data
|
||||||
int32_t finalLen =
|
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
|
||||||
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
|
|
||||||
|
|
||||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||||
|
|
|
@ -6259,16 +6259,28 @@ static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t rewriteShowDnodeVariables(STranslateContext* pCxt, SQuery* pQuery) {
|
static int32_t rewriteShowDnodeVariables(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
SSelectStmt* pStmt = NULL;
|
SShowDnodeVariablesStmt* pStmt = (SShowDnodeVariablesStmt*)pQuery->pRoot;
|
||||||
int32_t code = createSelectStmtForShow(nodeType(pQuery->pRoot), &pStmt);
|
SNode* pDnodeCond = NULL;
|
||||||
|
SNode* pLikeCond = NULL;
|
||||||
|
SSelectStmt* pSelect = NULL;
|
||||||
|
int32_t code = createSelectStmtForShow(nodeType(pQuery->pRoot), &pSelect);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", ((SShowDnodeVariablesStmt*)pQuery->pRoot)->pDnodeId,
|
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pStmt->pDnodeId, &pDnodeCond);
|
||||||
&pStmt->pWhere);
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createOperatorNode(OP_TYPE_LIKE, "name", pStmt->pLikePattern, &pLikeCond);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pLikeCond) {
|
||||||
|
code = createLogicCondNode(pDnodeCond, pLikeCond, &pSelect->pWhere);
|
||||||
|
} else {
|
||||||
|
pSelect->pWhere = pDnodeCond;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pQuery->showRewrite = true;
|
pQuery->showRewrite = true;
|
||||||
nodesDestroyNode(pQuery->pRoot);
|
nodesDestroyNode(pQuery->pRoot);
|
||||||
pQuery->pRoot = (SNode*)pStmt;
|
pQuery->pRoot = (SNode*)pSelect;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -100,6 +100,8 @@ TEST_F(ParserShowToUseTest, showDnodeVariables) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("SHOW DNODE 1 VARIABLES");
|
run("SHOW DNODE 1 VARIABLES");
|
||||||
|
|
||||||
|
run("SHOW DNODE 1 VARIABLES LIKE '%debug%'");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ParserShowToUseTest, showFunctions) {
|
TEST_F(ParserShowToUseTest, showFunctions) {
|
||||||
|
|
|
@ -2365,7 +2365,7 @@ static int32_t mergeProjectsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
||||||
return mergeProjectsOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
|
return mergeProjectsOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tagScanMayBeOptimized(SLogicNode* pNode) {
|
static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || (SCAN_TYPE_TAG == ((SScanLogicNode*)pNode)->scanType)) {
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || (SCAN_TYPE_TAG == ((SScanLogicNode*)pNode)->scanType)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2397,7 +2397,7 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanMayBeOptimized);
|
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
|
||||||
if (NULL == pScanNode) {
|
if (NULL == pScanNode) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2440,6 +2440,29 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, pushDownLimitOptShouldBeOptimized);
|
||||||
|
if (NULL == pNode) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||||
|
nodesDestroyNode(pChild->pLimit);
|
||||||
|
pChild->pLimit = pNode->pLimit;
|
||||||
|
pNode->pLimit = NULL;
|
||||||
|
pCxt->optimized = true;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
static const SOptimizeRule optimizeRuleSet[] = {
|
static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
||||||
|
@ -2453,7 +2476,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
||||||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||||
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
|
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
|
||||||
{.pName = "TagScan", .optimizeFunc = tagScanOptimize}
|
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
|
||||||
|
// {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -84,6 +84,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
|
||||||
if (NULL == pExchange) {
|
if (NULL == pExchange) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
pExchange->srcStartGroupId = pCxt->groupId;
|
pExchange->srcStartGroupId = pCxt->groupId;
|
||||||
pExchange->srcEndGroupId = pCxt->groupId;
|
pExchange->srcEndGroupId = pCxt->groupId;
|
||||||
pExchange->node.precision = pChild->precision;
|
pExchange->node.precision = pChild->precision;
|
||||||
|
@ -91,6 +92,13 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
|
||||||
if (NULL == pExchange->node.pTargets) {
|
if (NULL == pExchange->node.pTargets) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
if (NULL != pChild->pLimit) {
|
||||||
|
pExchange->node.pLimit = nodesCloneNode(pChild->pLimit);
|
||||||
|
if (NULL == pExchange->node.pLimit) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
((SLimitNode*)pChild->pLimit)->offset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
*pOutput = pExchange;
|
*pOutput = pExchange;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -921,6 +929,13 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp
|
||||||
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
|
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
|
||||||
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
|
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
|
||||||
pSplitNode = pInfo->pSplitNode->pParent;
|
pSplitNode = pInfo->pSplitNode->pParent;
|
||||||
|
if (NULL != pInfo->pSplitNode->pLimit) {
|
||||||
|
pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
|
||||||
|
if (NULL == pSplitNode->pLimit) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
|
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -95,9 +95,13 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("SELECT c1, sum(c3) FROM t1 GROUP BY c1");
|
run("SELECT c1, sum(c3) FROM t1 GROUP BY c1");
|
||||||
|
|
||||||
run("SELECT c1 FROM t1");
|
run("SELECT c1 FROM t1");
|
||||||
|
|
||||||
run("SELECT * FROM st1");
|
run("SELECT * FROM st1");
|
||||||
|
|
||||||
run("SELECT c1 FROM st1s3");
|
run("SELECT c1 FROM st1s3");
|
||||||
|
|
||||||
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
|
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,4 +140,14 @@ TEST_F(PlanOptimizeTest, tagScan) {
|
||||||
run("select tag1 from st1 group by tag1");
|
run("select tag1 from st1 group by tag1");
|
||||||
run("select distinct tag1 from st1");
|
run("select distinct tag1 from st1");
|
||||||
run("select tag1*tag1 from st1 group by tag1*tag1");
|
run("select tag1*tag1 from st1 group by tag1*tag1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanOptimizeTest, pushDownLimit) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("SELECT c1 FROM t1 LIMIT 1");
|
||||||
|
|
||||||
|
run("SELECT c1 FROM st1 LIMIT 1");
|
||||||
|
|
||||||
|
run("SELECT c1 FROM st1 LIMIT 20 OFFSET 10");
|
||||||
|
}
|
||||||
|
|
|
@ -85,6 +85,8 @@ TEST_F(PlanOtherTest, show) {
|
||||||
|
|
||||||
run("SHOW DNODE 1 VARIABLES");
|
run("SHOW DNODE 1 VARIABLES");
|
||||||
|
|
||||||
|
run("SHOW DNODE 1 VARIABLES LIKE '%debug%'");
|
||||||
|
|
||||||
run("SHOW TAGS FROM st1s1");
|
run("SHOW TAGS FROM st1s1");
|
||||||
|
|
||||||
run("SHOW TABLE TAGS FROM st1");
|
run("SHOW TABLE TAGS FROM st1");
|
||||||
|
|
|
@ -1082,7 +1082,12 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left,
|
||||||
if (info->unitNum >= info->unitSize) {
|
if (info->unitNum >= info->unitSize) {
|
||||||
uint32_t psize = info->unitSize;
|
uint32_t psize = info->unitSize;
|
||||||
info->unitSize += FILTER_DEFAULT_UNIT_SIZE;
|
info->unitSize += FILTER_DEFAULT_UNIT_SIZE;
|
||||||
info->units = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit));
|
|
||||||
|
void *tmp = taosMemoryRealloc(info->units, info->unitSize * sizeof(SFilterUnit));
|
||||||
|
if (tmp == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
info->units = tmp;
|
||||||
memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE);
|
memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1135,7 +1140,12 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
|
||||||
int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
|
int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
|
||||||
if (group->unitNum >= group->unitSize) {
|
if (group->unitNum >= group->unitSize) {
|
||||||
group->unitSize += FILTER_DEFAULT_UNIT_SIZE;
|
group->unitSize += FILTER_DEFAULT_UNIT_SIZE;
|
||||||
group->unitIdxs = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs));
|
|
||||||
|
void *tmp = taosMemoryRealloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs));
|
||||||
|
if (tmp == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
group->unitIdxs = tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
group->unitIdxs[group->unitNum++] = unitIdx;
|
group->unitIdxs[group->unitNum++] = unitIdx;
|
||||||
|
@ -3712,7 +3722,7 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
|
||||||
SListCell *cell = node->pParameterList->pHead;
|
SListCell *cell = node->pParameterList->pHead;
|
||||||
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
|
for (int32_t i = 0; i < node->pParameterList->length; ++i) {
|
||||||
if (NULL == cell || NULL == cell->pNode) {
|
if (NULL == cell || NULL == cell->pNode) {
|
||||||
fltError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
|
fltError("invalid cell");
|
||||||
stat->code = TSDB_CODE_QRY_INVALID_INPUT;
|
stat->code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -4066,6 +4076,10 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
|
||||||
*p = output.columnData;
|
*p = output.columnData;
|
||||||
output.numOfRows = pSrc->info.rows;
|
output.numOfRows = pSrc->info.rows;
|
||||||
|
|
||||||
|
if (*p == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
|
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
|
||||||
|
|
||||||
// todo this should be return during filter procedure
|
// todo this should be return during filter procedure
|
||||||
|
|
|
@ -331,7 +331,10 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
|
|
||||||
ASSERT(param->columnData == NULL);
|
ASSERT(param->columnData == NULL);
|
||||||
param->numOfRows = 1;
|
param->numOfRows = 1;
|
||||||
/*int32_t code = */ sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
|
int32_t code = sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
SCL_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
|
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
|
||||||
colDataAppendNULL(param->columnData, 0);
|
colDataAppendNULL(param->columnData, 0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1485,8 +1488,13 @@ static int32_t sclGetMinusOperatorResType(SOperatorNode *pOp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
|
static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
|
||||||
|
if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
|
|
||||||
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
||||||
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
||||||
|
|
||||||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
|
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
|
||||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
|
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
|
||||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
|
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
|
||||||
|
@ -1507,10 +1515,21 @@ static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
|
static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
|
||||||
|
if (pOp == NULL || pOp->pLeft == NULL) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
|
|
||||||
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
||||||
|
|
||||||
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
||||||
|
if (pOp->pRight == NULL) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
((SExprNode *)(pOp->pRight))->resType = ldt;
|
((SExprNode *)(pOp->pRight))->resType = ldt;
|
||||||
} else if (nodesIsRegularOp(pOp)) {
|
} else if (nodesIsRegularOp(pOp)) {
|
||||||
|
if (pOp->pRight == NULL) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
||||||
if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
|
if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
|
||||||
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
|
(!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
|
||||||
|
@ -1523,8 +1542,13 @@ static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) {
|
static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) {
|
||||||
|
if (pOp == NULL || pOp->pLeft == NULL || pOp->pRight == NULL) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
|
|
||||||
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
|
||||||
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
|
||||||
|
|
||||||
if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) {
|
if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1416,7 +1416,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
||||||
timeVal[k] = timeVal[k] * 1000;
|
timeVal[k] = timeVal[k] * 1000;
|
||||||
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||||
timeVal[k] = timeVal[k];
|
timeVal[k] = timeVal[k] * 1;
|
||||||
} else {
|
} else {
|
||||||
hasNull = true;
|
hasNull = true;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -344,8 +344,11 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn
|
||||||
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
||||||
|
|
||||||
char *t = taosMemoryCalloc(1, outputMaxLen);
|
char *t = taosMemoryCalloc(1, outputMaxLen);
|
||||||
/*int32_t resLen = */ taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t),
|
int32_t ret = taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t),
|
||||||
outputMaxLen - VARSTR_HEADER_SIZE, &len);
|
outputMaxLen - VARSTR_HEADER_SIZE, &len);
|
||||||
|
if (!ret) {
|
||||||
|
sclError("failed to convert to NCHAR");
|
||||||
|
}
|
||||||
varDataSetLen(t, len);
|
varDataSetLen(t, len);
|
||||||
|
|
||||||
colDataAppend(pOut->columnData, rowIndex, t, false);
|
colDataAppend(pOut->columnData, rowIndex, t, false);
|
||||||
|
|
|
@ -45,6 +45,11 @@
|
||||||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||||
//
|
//
|
||||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
sError("pSyncNode is NULL");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
||||||
return;
|
return;
|
||||||
|
@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||||
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
||||||
return pSyncNode->quorum;
|
return pSyncNode->quorum;
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t quorum = 1; // self
|
int32_t quorum = 1; // self
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
int64_t timeNow = taosGetTimestampMs();
|
||||||
|
@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return quorum;
|
return quorum;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
|
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
|
||||||
pEpSet->eps[i].port);
|
pEpSet->eps[i].port);
|
||||||
}
|
}
|
||||||
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
if (pEpSet->numOfEps > 0) {
|
||||||
|
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||||
|
}
|
||||||
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
syncNodeEventLog(pSyncNode, "sync close");
|
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
|
|
||||||
|
syncNodeEventLog(pSyncNode, "sync close");
|
||||||
|
|
||||||
ret = raftStoreClose(pSyncNode->pRaftStore);
|
ret = raftStoreClose(pSyncNode->pRaftStore);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t userStrLen = strlen(str);
|
int32_t userStrLen = strlen(str);
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
|
@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
|
||||||
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
|
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
||||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// htonl
|
// htonl
|
||||||
SMsgHead* pHead = rpcMsg.pCont;
|
SMsgHead* pHead = rpcMsg.pCont;
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
*/
|
*/
|
||||||
|
|
||||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||||
|
syncPingReplyDestroy(pMsgReply);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||||
|
|
||||||
// reply
|
// reply
|
||||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||||
|
syncHeartbeatReplyDestroy(pMsgReply);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// advance commit index to sanpshot first
|
if (ths == NULL) {
|
||||||
SSnapshot snapshot = {0};
|
return -1;
|
||||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
}
|
||||||
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
|
|
||||||
char eventLog[128];
|
|
||||||
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
|
|
||||||
snapshot.lastApplyIndex);
|
|
||||||
syncNodeEventLog(ths, eventLog);
|
|
||||||
|
|
||||||
// update begin index
|
if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
beginIndex = snapshot.lastApplyIndex + 1;
|
// advance commit index to sanpshot first
|
||||||
|
SSnapshot snapshot = {0};
|
||||||
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
|
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
|
||||||
|
snapshot.lastApplyIndex);
|
||||||
|
syncNodeEventLog(ths, eventLog);
|
||||||
|
|
||||||
|
// update begin index
|
||||||
|
beginIndex = snapshot.lastApplyIndex + 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
|
|
||||||
// config change finish
|
// config change finish
|
||||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||||
code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
|
if (rpcMsg.pCont != NULL) {
|
||||||
ASSERT(code == 0);
|
code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
|
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
|
||||||
if (pSender->start) {
|
if (pSender != NULL && pSender->start) {
|
||||||
sError("sync cannot change3");
|
sError("sync cannot change3");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
|
||||||
pMsg->bytes = bytes;
|
pMsg->bytes = bytes;
|
||||||
|
|
||||||
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ASSERT(len = pMsg->dataLen);
|
ASSERT(len == pMsg->dataLen);
|
||||||
memcpy(pMsg->data, data, len);
|
memcpy(pMsg->data, data, len);
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
|
||||||
pMsg->bytes = bytes;
|
pMsg->bytes = bytes;
|
||||||
|
|
||||||
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ASSERT(len = pMsg->dataLen);
|
ASSERT(len == pMsg->dataLen);
|
||||||
memcpy(pMsg->data, data, len);
|
memcpy(pMsg->data, data, len);
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
|
@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
||||||
if (code == 1) {
|
if (code == 1) {
|
||||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
*ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes));
|
||||||
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
||||||
(*ppEntry)->rid = -1;
|
(*ppEntry)->rid = -1;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* syncUtilprintBin(char* ptr, uint32_t len) {
|
char* syncUtilprintBin(char* ptr, uint32_t len) {
|
||||||
char* s = taosMemoryMalloc(len + 1);
|
int64_t memLen = (int64_t)(len + 1);
|
||||||
|
char* s = taosMemoryMalloc(memLen);
|
||||||
ASSERT(s != NULL);
|
ASSERT(s != NULL);
|
||||||
memset(s, 0, len + 1);
|
memset(s, 0, len + 1);
|
||||||
memcpy(s, ptr, len);
|
memcpy(s, ptr, len);
|
||||||
|
|
|
@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
} else {
|
} else {
|
||||||
tError("fail to dispatch conn to work thread");
|
tError("fail to dispatch conn to work thread");
|
||||||
}
|
}
|
||||||
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
if (!uv_is_closing((uv_handle_t*)req->data)) {
|
||||||
|
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(req->data);
|
||||||
|
}
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
uv_tcp_init(pObj->loop, cli);
|
uv_tcp_init(pObj->loop, cli);
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
|
#ifdef WINDOWS
|
||||||
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
|
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
|
||||||
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
|
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
|
||||||
pObj->numOfWorkerReady);
|
pObj->numOfWorkerReady);
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
wr->data = cli;
|
wr->data = cli;
|
||||||
|
@ -668,7 +674,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
if (!uv_is_closing((uv_handle_t*)cli)) {
|
||||||
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(cli);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -681,7 +691,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
tWarn("failed to create connect:%p", q);
|
tWarn("failed to create connect:%p", q);
|
||||||
taosMemoryFree(buf->base);
|
taosMemoryFree(buf->base);
|
||||||
uv_close((uv_handle_t*)q, NULL);
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
// taosMemoryFree(q);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// free memory allocated by
|
// free memory allocated by
|
||||||
|
@ -770,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef WINDOWS
|
||||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||||
|
#else
|
||||||
|
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
||||||
|
uv_pipe_open(pThrd->pipe, pThrd->fd);
|
||||||
|
#endif
|
||||||
|
|
||||||
pThrd->pipe->data = pThrd;
|
pThrd->pipe->data = pThrd;
|
||||||
|
|
||||||
|
@ -785,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||||
QUEUE_INIT(&pThrd->conn);
|
QUEUE_INIT(&pThrd->conn);
|
||||||
|
|
||||||
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
|
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
|
||||||
|
#ifdef WINDOWS
|
||||||
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
|
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
|
||||||
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
#else
|
||||||
|
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
|
#endif
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,20 +975,19 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
srv->port = port;
|
srv->port = port;
|
||||||
uv_loop_init(srv->loop);
|
uv_loop_init(srv->loop);
|
||||||
|
|
||||||
|
char pipeName[PATH_MAX];
|
||||||
|
#ifdef WINDOWS
|
||||||
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("failed to init pipe, errmsg: %s", uv_err_name(ret));
|
tError("failed to init pipe, errmsg: %s", uv_err_name(ret));
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
|
||||||
char pipeName[64];
|
|
||||||
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
|
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
|
||||||
#else
|
// char pipeName[PATH_MAX] = {0};
|
||||||
char pipeName[PATH_MAX] = {0};
|
// snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
|
||||||
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
|
// taosGetSelfPthreadId());
|
||||||
taosGetSelfPthreadId());
|
|
||||||
#endif
|
|
||||||
ret = uv_pipe_bind(&srv->pipeListen, pipeName);
|
ret = uv_pipe_bind(&srv->pipeListen, pipeName);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("failed to bind pipe, errmsg: %s", uv_err_name(ret));
|
tError("failed to bind pipe, errmsg: %s", uv_err_name(ret));
|
||||||
|
@ -997,6 +1013,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
|
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("success to create worker-thread:%d", i);
|
tDebug("success to create worker-thread:%d", i);
|
||||||
|
@ -1006,14 +1023,54 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
|
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
||||||
|
|
||||||
|
thrd->pTransInst = shandle;
|
||||||
|
thrd->quit = false;
|
||||||
|
thrd->pTransInst = shandle;
|
||||||
|
|
||||||
|
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||||
|
srv->pThreadObj[i] = thrd;
|
||||||
|
|
||||||
|
uv_os_sock_t fds[2];
|
||||||
|
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||||
|
uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
|
||||||
|
|
||||||
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
|
thrd->fd = fds[0];
|
||||||
|
|
||||||
|
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
|
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
|
||||||
|
if (err == 0) {
|
||||||
|
tDebug("success to create worker-thread:%d", i);
|
||||||
|
} else {
|
||||||
|
// TODO: clear all other resource later
|
||||||
|
tError("failed to create worker-thread:%d", i);
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (false == addHandleToAcceptloop(srv)) {
|
if (false == addHandleToAcceptloop(srv)) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
|
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("success to create accept-thread");
|
tDebug("success to create accept-thread");
|
||||||
|
@ -1022,6 +1079,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
goto End;
|
goto End;
|
||||||
// clear all resource later
|
// clear all resource later
|
||||||
}
|
}
|
||||||
|
|
||||||
srv->inited = true;
|
srv->inited = true;
|
||||||
return srv;
|
return srv;
|
||||||
End:
|
End:
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tfunctional.h"
|
#include "tfunctional.h"
|
||||||
|
|
||||||
|
FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); }
|
||||||
|
|
||||||
|
#if 0
|
||||||
tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int32_t numOfArgs) {
|
tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int32_t numOfArgs) {
|
||||||
tGenericSavedFunc* pSavedFunc = taosMemoryMalloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*)));
|
tGenericSavedFunc* pSavedFunc = taosMemoryMalloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*)));
|
||||||
if (pSavedFunc == NULL) return NULL;
|
if (pSavedFunc == NULL) return NULL;
|
||||||
|
@ -37,10 +40,9 @@ tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int32_t numOfArgs) {
|
||||||
return pSavedFunc;
|
return pSavedFunc;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); }
|
|
||||||
|
|
||||||
FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); }
|
FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); }
|
||||||
|
|
||||||
FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) {
|
FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) {
|
||||||
if (pSavedFunc) pSavedFunc->func(pSavedFunc->args);
|
if (pSavedFunc) pSavedFunc->func(pSavedFunc->args);
|
||||||
}
|
}
|
||||||
|
#endif
|
|
@ -137,10 +137,9 @@ sql_error show create database d2
|
||||||
sql show create table d2.stb2;
|
sql show create table d2.stb2;
|
||||||
sql show create table d2.ctb2;
|
sql show create table d2.ctb2;
|
||||||
sql show create table d2.ntb2;
|
sql show create table d2.ntb2;
|
||||||
sql_error show variables;
|
|
||||||
sql show local variables;
|
sql show local variables;
|
||||||
sql_error show dnode 1 variables;
|
sql_error show dnode 1 variables;
|
||||||
sql_error show variables;
|
sql show variables;
|
||||||
|
|
||||||
|
|
||||||
print =============== check information_schema
|
print =============== check information_schema
|
||||||
|
@ -167,7 +166,7 @@ sql select * from information_schema.ins_subscriptions
|
||||||
sql select * from information_schema.ins_streams
|
sql select * from information_schema.ins_streams
|
||||||
sql_error select * from information_schema.ins_grants
|
sql_error select * from information_schema.ins_grants
|
||||||
sql_error select * from information_schema.ins_vgroups
|
sql_error select * from information_schema.ins_vgroups
|
||||||
sql_error select * from information_schema.ins_configs
|
sql select * from information_schema.ins_configs
|
||||||
sql_error select * from information_schema.ins_dnode_variables
|
sql_error select * from information_schema.ins_dnode_variables
|
||||||
|
|
||||||
print =============== check performance_schema
|
print =============== check performance_schema
|
||||||
|
|
|
@ -104,7 +104,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -251,7 +251,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
|
|
@ -104,7 +104,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -251,7 +251,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
|
|
@ -104,7 +104,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -149,7 +149,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
@ -252,7 +252,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 1000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 100,
|
'batchNum': 100,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 3,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 0}
|
'snapshot': 0}
|
||||||
|
|
Loading…
Reference in New Issue