Merge branch 'develop' into test/TD-3433
This commit is contained in:
commit
c2da6d215a
|
@ -39,14 +39,14 @@ def pre_test(){
|
|||
sudo rmtaos || echo "taosd has not installed"
|
||||
'''
|
||||
sh '''
|
||||
|
||||
killall -9 taosd ||echo "no taosd running"
|
||||
killall -9 gdb || echo "no gdb running"
|
||||
cd ${WKC}
|
||||
git checkout develop
|
||||
git reset --hard HEAD~10 >/dev/null
|
||||
git pull >/dev/null
|
||||
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
||||
git checkout -qf FETCH_HEAD
|
||||
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile'
|
||||
find ${WKC}/tests/pytest -name \'*\'.sql -exec rm -rf {} \\;
|
||||
cd ${WK}
|
||||
git reset --hard HEAD~10
|
||||
|
|
|
@ -111,9 +111,10 @@ taos>
|
|||
|
||||
**提示:**
|
||||
|
||||
- 任何已经加入集群在线的数据节点,都可以作为后续待加入节点的firstEP。
|
||||
- firstEp这个参数仅仅在该数据节点首次加入集群时有作用,加入集群后,该数据节点会保存最新的mnode的End Point列表,不再依赖这个参数。
|
||||
- 两个没有配置firstEp参数的数据节点dnode启动后,会独立运行起来。这个时候,无法将其中一个数据节点加入到另外一个数据节点,形成集群。**无法将两个独立的集群合并成为新的集群**。
|
||||
- 任何已经加入集群在线的数据节点,都可以作为后续待加入节点的 firstEP。
|
||||
- firstEp 这个参数仅仅在该数据节点首次加入集群时有作用,加入集群后,该数据节点会保存最新的 mnode 的 End Point 列表,不再依赖这个参数。
|
||||
- 接下来,配置文件中的 firstEp 参数就主要在客户端连接的时候使用了,例如 taos shell 如果不加参数,会默认连接由 firstEp 指定的节点。
|
||||
- 两个没有配置 firstEp 参数的数据节点 dnode 启动后,会独立运行起来。这个时候,无法将其中一个数据节点加入到另外一个数据节点,形成集群。**无法将两个独立的集群合并成为新的集群**。
|
||||
|
||||
## <a class="anchor" id="management"></a>数据节点管理
|
||||
|
||||
|
|
|
@ -83,6 +83,22 @@ typedef struct SJoinSupporter {
|
|||
SArray* pVgroupTables;
|
||||
} SJoinSupporter;
|
||||
|
||||
|
||||
typedef struct SMergeCtx {
|
||||
SJoinSupporter* p;
|
||||
int32_t idx;
|
||||
SArray* res;
|
||||
int8_t compared;
|
||||
}SMergeCtx;
|
||||
|
||||
typedef struct SMergeTsCtx {
|
||||
SJoinSupporter* p;
|
||||
STSBuf* res;
|
||||
int64_t numOfInput;
|
||||
int8_t compared;
|
||||
}SMergeTsCtx;
|
||||
|
||||
|
||||
typedef struct SVgroupTableInfo {
|
||||
SVgroupInfo vgInfo;
|
||||
SArray* itemList; //SArray<STableIdInfo>
|
||||
|
@ -183,6 +199,7 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deep
|
|||
void tscSqlExprInfoDestroy(SArray* pExprInfo);
|
||||
|
||||
SColumn* tscColumnClone(const SColumn* src);
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex);
|
||||
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
|
||||
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
|
||||
void tscColumnListDestroy(SArray* pColList);
|
||||
|
|
|
@ -142,15 +142,15 @@ typedef struct SCond {
|
|||
} SCond;
|
||||
|
||||
typedef struct SJoinNode {
|
||||
char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t uid;
|
||||
int16_t tagColId;
|
||||
SArray* tsJoin;
|
||||
SArray* tagJoin;
|
||||
} SJoinNode;
|
||||
|
||||
typedef struct SJoinInfo {
|
||||
bool hasJoin;
|
||||
SJoinNode left;
|
||||
SJoinNode right;
|
||||
bool hasJoin;
|
||||
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
|
||||
} SJoinInfo;
|
||||
|
||||
typedef struct STagCond {
|
||||
|
|
|
@ -977,12 +977,18 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSl
|
|||
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pTableName, SSqlObj* pSql) {
|
||||
const char* msg1 = "name too long";
|
||||
const char* msg2 = "acctId too long";
|
||||
const char* msg3 = "no acctId";
|
||||
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (hasSpecifyDB(pTableName)) { // db has been specified in sql string so we ignore current db path
|
||||
code = tNameSetAcctId(&pTableMetaInfo->name, getAccountId(pSql));
|
||||
char* acctId = getAccountId(pSql);
|
||||
if (acctId == NULL || strlen(acctId) <= 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
code = tNameSetAcctId(&pTableMetaInfo->name, acctId);
|
||||
if (code != 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
@ -3353,24 +3359,26 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSq
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
const char* msg1 = "invalid join query condition";
|
||||
const char* msg2 = "invalid table name in join query";
|
||||
static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
int32_t code = 0;
|
||||
const char* msg1 = "timestamp required for join tables";
|
||||
const char* msg3 = "type of join columns must be identical";
|
||||
const char* msg4 = "invalid column name in join condition";
|
||||
const char* msg5 = "only support one join tag for each table";
|
||||
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!tSqlExprIsParentOfLeaf(pExpr)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
code = checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pRight);
|
||||
}
|
||||
|
||||
STagCond* pTagCond = &pQueryInfo->tagCond;
|
||||
SJoinNode* pLeft = &pTagCond->joinInfo.left;
|
||||
SJoinNode* pRight = &pTagCond->joinInfo.right;
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
|
@ -3379,13 +3387,28 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
pLeft->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
pLeft->tagColId = pTagSchema1->colId;
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
|
||||
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pLeft->tableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*leftNode == NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
(*leftNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
(*leftNode)->tagColId = pTagSchema1->colId;
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int16_t leftIdx = index.tableIndex;
|
||||
|
||||
|
||||
index = (SColumnIndex)COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3395,20 +3418,55 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
|
|||
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
|
||||
|
||||
pRight->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
pRight->tagColId = pTagSchema2->colId;
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
|
||||
code = tNameExtractFullName(&pTableMetaInfo->name, pRight->tableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*rightNode == NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
(*rightNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
(*rightNode)->tagColId = pTagSchema2->colId;
|
||||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
|
||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int16_t rightIdx = index.tableIndex;
|
||||
|
||||
if (pTagSchema1->type != pTagSchema2->type) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
pTagCond->joinInfo.hasJoin = true;
|
||||
if ((*leftNode)->tagJoin == NULL) {
|
||||
(*leftNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
if ((*rightNode)->tagJoin == NULL) {
|
||||
(*rightNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
taosArrayPush((*leftNode)->tagJoin, &rightIdx);
|
||||
taosArrayPush((*rightNode)->tagJoin, &leftIdx);
|
||||
|
||||
pQueryInfo->tagCond.joinInfo.hasJoin = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr);
|
||||
}
|
||||
|
||||
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList,
|
||||
|
@ -3674,7 +3732,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
const char* msg1 = "table query cannot use tags filter";
|
||||
const char* msg2 = "illegal column name";
|
||||
const char* msg3 = "only one query time range allowed";
|
||||
const char* msg4 = "only one join condition allowed";
|
||||
const char* msg4 = "too many join tables";
|
||||
const char* msg5 = "not support ordinary column join";
|
||||
const char* msg6 = "only one query condition on tbname allowed";
|
||||
const char* msg7 = "only in/like allowed in filter table name";
|
||||
|
@ -3705,6 +3763,47 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY);
|
||||
pCondExpr->tsJoin = true;
|
||||
|
||||
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
|
||||
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*leftNode == NULL) {
|
||||
*leftNode = calloc(1, sizeof(SJoinNode));
|
||||
if (*leftNode == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
int16_t leftIdx = index.tableIndex;
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, &pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
if (index.tableIndex < 0 || index.tableIndex >= TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
|
||||
if (*rightNode == NULL) {
|
||||
*rightNode = calloc(1, sizeof(SJoinNode));
|
||||
if (*rightNode == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
int16_t rightIdx = index.tableIndex;
|
||||
|
||||
if ((*leftNode)->tsJoin == NULL) {
|
||||
(*leftNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
if ((*rightNode)->tsJoin == NULL) {
|
||||
(*rightNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
|
||||
}
|
||||
|
||||
taosArrayPush((*leftNode)->tsJoin, &rightIdx);
|
||||
taosArrayPush((*rightNode)->tsJoin, &leftIdx);
|
||||
|
||||
/*
|
||||
* to release expression, e.g., m1.ts = m2.ts,
|
||||
* since this expression is used to set the join query type
|
||||
|
@ -3762,10 +3861,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
|
|||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pCondExpr->pJoinExpr != NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_QUERY;
|
||||
ret = setExprToCond(&pCondExpr->pJoinExpr, *pExpr, NULL, parentOptr, pQueryInfo->msg);
|
||||
*pExpr = NULL;
|
||||
|
@ -3993,7 +4088,8 @@ static bool validateFilterExpr(SQueryInfo* pQueryInfo) {
|
|||
static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
const char* msg0 = "invalid timestamp";
|
||||
const char* msg1 = "only one time stamp window allowed";
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -4003,8 +4099,11 @@ static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlE
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
|
||||
code = getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pRight);
|
||||
} else {
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
|
@ -4085,6 +4184,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
|
@ -4107,6 +4207,7 @@ static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInf
|
|||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
|
||||
const char *msg1 = "invalid tag operator";
|
||||
|
@ -4250,6 +4351,102 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t validateJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
|
||||
const char* msg1 = "timestamp required for join tables";
|
||||
const char* msg2 = "tag required for join stables";
|
||||
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
|
||||
|
||||
if (node == NULL || node->tsJoin == NULL || taosArrayGetSize(node->tsJoin) <= 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg1);
|
||||
}
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
|
||||
|
||||
if (node == NULL || node->tagJoin == NULL || taosArrayGetSize(node->tagJoin) <= 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void mergeJoinNodesImpl(int8_t* r, int8_t* p, int16_t* tidx, SJoinNode** nodes, int32_t type) {
|
||||
SJoinNode *node = nodes[*tidx];
|
||||
SArray* arr = (type == 0) ? node->tsJoin : node->tagJoin;
|
||||
size_t size = taosArrayGetSize(arr);
|
||||
|
||||
p[*tidx] = 1;
|
||||
|
||||
for (int32_t j = 0; j < size; j++) {
|
||||
int16_t* idx = taosArrayGet(arr, j);
|
||||
r[*idx] = 1;
|
||||
if (p[*idx] == 0) {
|
||||
mergeJoinNodesImpl(r, p, idx, nodes, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mergeJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
|
||||
const char* msg1 = "not all join tables have same timestamp";
|
||||
const char* msg2 = "not all join tables have same tag";
|
||||
|
||||
int8_t r[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
int8_t p[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 0);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->tagCond.joinInfo.joinTables[0]->tsJoin) != pQueryInfo->numOfTables) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg1);
|
||||
}
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 1);
|
||||
|
||||
taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin);
|
||||
|
||||
for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) {
|
||||
if (r[j]) {
|
||||
taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin, &j);
|
||||
}
|
||||
}
|
||||
|
||||
memset(r, 0, sizeof(r));
|
||||
memset(p, 0, sizeof(p));
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pQueryInfo->tagCond.joinInfo.joinTables[0]->tagJoin) != pQueryInfo->numOfTables) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) {
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4295,17 +4492,17 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
|
|||
|
||||
// 4. get the table name query condition
|
||||
if ((ret = getTablenameCond(&pSql->cmd, pQueryInfo, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 5. other column query condition
|
||||
if ((ret = getColumnQueryCondInfo(&pSql->cmd, pQueryInfo, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 6. join condition
|
||||
if ((ret = getJoinCondInfo(&pSql->cmd, pQueryInfo, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
// 7. query condition for table name
|
||||
|
@ -4313,12 +4510,29 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
|
|||
|
||||
ret = setTableCondForSTableQuery(&pSql->cmd, pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
|
||||
taosStringBuilderDestroy(&sb);
|
||||
|
||||
if (!validateFilterExpr(pQueryInfo)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
|
||||
if (!validateFilterExpr(pQueryInfo)) {
|
||||
ret = invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
//doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
|
||||
if (condExpr.tsJoin) {
|
||||
ret = validateJoinNodes(pQueryInfo, pSql);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
|
||||
ret = mergeJoinNodes(pQueryInfo, pSql);
|
||||
if (ret) {
|
||||
goto PARSE_WHERE_EXIT;
|
||||
}
|
||||
}
|
||||
|
||||
PARSE_WHERE_EXIT:
|
||||
|
||||
cleanQueryExpr(&condExpr);
|
||||
return ret;
|
||||
|
@ -6531,7 +6745,6 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
const char* msg1 = "point interpolation query needs timestamp";
|
||||
const char* msg2 = "fill only available for interval query";
|
||||
const char* msg3 = "start(end) time of query range required or time range too large";
|
||||
const char* msg4 = "illegal number of tables in from clause";
|
||||
const char* msg5 = "too many columns in selection clause";
|
||||
const char* msg6 = "too many tables in from clause";
|
||||
const char* msg7 = "invalid table alias name";
|
||||
|
@ -6568,14 +6781,11 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
}
|
||||
|
||||
size_t fromSize = taosArrayGetSize(pQuerySqlNode->from->tableList);
|
||||
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM * 2) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||
if (fromSize > 2) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
|
||||
// set all query tables, which are maybe more than one.
|
||||
for (int32_t i = 0; i < fromSize; ++i) {
|
||||
|
|
|
@ -613,7 +613,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
|
|||
tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
|
||||
}
|
||||
|
||||
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
||||
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg, int32_t *succeed) {
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
||||
TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
|
||||
|
||||
|
@ -626,9 +626,14 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
assert(index >= 0);
|
||||
|
||||
SVgroupInfo* pVgroupInfo = NULL;
|
||||
if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
|
||||
if (pTableMetaInfo->vgroupList && pTableMetaInfo->vgroupList->numOfVgroups > 0) {
|
||||
assert(index < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
|
||||
} else {
|
||||
tscError("%p No vgroup info found", pSql);
|
||||
|
||||
*succeed = 0;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
vgId = pVgroupInfo->vgId;
|
||||
|
@ -948,8 +953,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->secondStageOutput = 0;
|
||||
}
|
||||
|
||||
int32_t succeed = 1;
|
||||
|
||||
// serialize the table info (sid, uid, tags)
|
||||
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
|
||||
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg, &succeed);
|
||||
if (succeed == 0) {
|
||||
return TSDB_CODE_TSC_APP_ERROR;
|
||||
}
|
||||
|
||||
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
|
||||
if (pGroupbyExpr->numOfGroupCols > 0) {
|
||||
|
@ -2081,19 +2091,24 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|||
assert(pInfo->vgroupList != NULL);
|
||||
|
||||
pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
|
||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
//just init, no need to lock
|
||||
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||
if (pInfo->vgroupList->numOfVgroups <= 0) {
|
||||
//tfree(pInfo->vgroupList);
|
||||
tscError("%p empty vgroup info", pSql);
|
||||
} else {
|
||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
//just init, no need to lock
|
||||
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||
|
||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||
pVgroups->vgId = htonl(vmsg->vgId);
|
||||
pVgroups->numOfEps = vmsg->numOfEps;
|
||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||
pVgroups->vgId = htonl(vmsg->vgId);
|
||||
pVgroups->numOfEps = vmsg->numOfEps;
|
||||
|
||||
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
|
||||
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
|
||||
|
||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
||||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
|
||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
||||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,13 @@ static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
|
|||
}
|
||||
|
||||
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
|
||||
STSElem el1 = tsBufGetElem(pTSBuf);
|
||||
|
||||
int32_t res = tVariantCompare(el1.tag, tag1);
|
||||
if (res != 0) { // it is a record with new tag
|
||||
return;
|
||||
}
|
||||
|
||||
while (tsBufNextPos(pTSBuf)) {
|
||||
STSElem el1 = tsBufGetElem(pTSBuf);
|
||||
|
||||
|
@ -118,123 +125,233 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
|
|||
|
||||
|
||||
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
|
||||
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order);
|
||||
|
||||
win->skey = INT64_MAX;
|
||||
win->ekey = INT64_MIN;
|
||||
|
||||
SLimitVal* pLimit = &pQueryInfo->limit;
|
||||
int32_t order = pQueryInfo->order.order;
|
||||
int32_t joinNum = pSql->subState.numOfSub;
|
||||
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
|
||||
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
int32_t slot = 0;
|
||||
size_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
SMergeTsCtx* ctx = NULL;
|
||||
SMergeTsCtx* pctx = NULL;
|
||||
SMergeTsCtx* mainCtx = NULL;
|
||||
STSElem cur;
|
||||
STSElem prev;
|
||||
SArray* tsCond = NULL;
|
||||
int32_t mergeDone = 0;
|
||||
|
||||
SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
|
||||
SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
|
||||
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
|
||||
|
||||
pSubQueryInfo1->tsBuf = output1;
|
||||
pSubQueryInfo2->tsBuf = output2;
|
||||
pSubQueryInfo->tsBuf = output;
|
||||
|
||||
SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
|
||||
|
||||
if (pSupporter->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter->pTSBuf)) {
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tscDebug("%p sub:%p table idx:%d, input group number:%d", pSql, pSql->pSubs[i], i, pSupporter->pTSBuf->numOfGroups);
|
||||
|
||||
ctxlist[i].p = pSupporter;
|
||||
ctxlist[i].res = output;
|
||||
}
|
||||
|
||||
TSKEY st = taosGetTimestampUs();
|
||||
|
||||
// no result generated, return directly
|
||||
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
|
||||
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsBufResetPos(pSupporter1->pTSBuf);
|
||||
tsBufResetPos(pSupporter2->pTSBuf);
|
||||
|
||||
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t numOfInput1 = 1;
|
||||
int64_t numOfInput2 = 1;
|
||||
|
||||
while(1) {
|
||||
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
|
||||
// no data in pSupporter1 anymore, jump out of loop
|
||||
if (!tsBufIsValidElem(&elem)) {
|
||||
break;
|
||||
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
|
||||
pctx = &ctxlist[tidx];
|
||||
if (pctx->compared) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
|
||||
assert(pctx->numOfInput == 0);
|
||||
|
||||
/**
|
||||
* there are elements in pSupporter2 with the same tag, continue
|
||||
*/
|
||||
tVariant tag1 = {0};
|
||||
tVariantAssign(&tag1, elem.tag);
|
||||
tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin;
|
||||
|
||||
tableNum = taosArrayGetSize(tsCond);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tsCond, i);
|
||||
SMergeTsCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, 0);
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
mainCtx = pctx;
|
||||
|
||||
while (1) {
|
||||
pctx = mainCtx;
|
||||
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
if (!tsBufIsValidElem(&prev)) {
|
||||
break;
|
||||
}
|
||||
|
||||
tVariant tag = {0};
|
||||
tVariantAssign(&tag, prev.tag);
|
||||
|
||||
int32_t skipped = 0;
|
||||
|
||||
for (int32_t i = 1; i < tableNum; ++i) {
|
||||
SMergeTsCtx* tctx = &ctxlist[i];
|
||||
|
||||
// find the data in supporter2 with the same tag value
|
||||
STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag);
|
||||
|
||||
if (!tsBufIsValidElem(&e2)) {
|
||||
skipRemainValue(pctx->p->pTSBuf, &tag);
|
||||
skipped = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (skipped) {
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, ++slot);
|
||||
equalNum = 1;
|
||||
|
||||
if (tsBufIsValidElem(&e2)) {
|
||||
while (1) {
|
||||
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
|
||||
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = tsBufGetElem(pctx->p->pTSBuf);
|
||||
cur = tsBufGetElem(ctx->p->pTSBuf);
|
||||
|
||||
// data with current are exhausted
|
||||
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
|
||||
if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
|
||||
* final results which is acquired after the secondary merge of in the client.
|
||||
*/
|
||||
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
|
||||
if (re < 0) {
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
} else if (re > 0) {
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
} else {
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = tsCompare(order, prev.ts, cur.ts);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tsCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (win->skey > elem1.ts) {
|
||||
win->skey = elem1.ts;
|
||||
if (win->skey > prev.ts) {
|
||||
win->skey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < prev.ts) {
|
||||
win->ekey = prev.ts;
|
||||
}
|
||||
|
||||
if (win->ekey < elem1.ts) {
|
||||
win->ekey = elem1.ts;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
prev = tsBufGetElem(tctx->p->pTSBuf);
|
||||
|
||||
tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts));
|
||||
}
|
||||
|
||||
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
|
||||
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
|
||||
} else {
|
||||
pLimit->offset -= 1;//offset apply to projection?
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter1->pTSBuf);
|
||||
numOfInput1++;
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
tsBufNextPos(pSupporter2->pTSBuf);
|
||||
numOfInput2++;
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
if (!tsBufNextPos(ctx->p->pTSBuf) && ctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
ctx->numOfInput++;
|
||||
stackidx--;
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeTsCtx* tctx = ctxStack[i];
|
||||
|
||||
if (!tsBufNextPos(tctx->p->pTSBuf) && tctx == mainCtx) {
|
||||
mergeDone = 1;
|
||||
}
|
||||
tctx->numOfInput++;
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
}
|
||||
|
||||
}
|
||||
} else { // no data in pSupporter2, ignore current data in pSupporter2
|
||||
skipRemainValue(pSupporter1->pTSBuf, &tag1);
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
slot = 0;
|
||||
stackidx = 0;
|
||||
|
||||
skipRemainValue(mainCtx->p->pTSBuf, &tag);
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -242,28 +359,32 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
|||
* 1. only one element
|
||||
* 2. only one element for each tag.
|
||||
*/
|
||||
if (output1->tsOrder == -1) {
|
||||
output1->tsOrder = TSDB_ORDER_ASC;
|
||||
output2->tsOrder = TSDB_ORDER_ASC;
|
||||
if (ctxlist[0].res->tsOrder == -1) {
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
ctxlist[i].res->tsOrder = TSDB_ORDER_ASC;
|
||||
}
|
||||
}
|
||||
|
||||
tsBufFlush(output1);
|
||||
tsBufFlush(output2);
|
||||
|
||||
tsBufDestroy(pSupporter1->pTSBuf);
|
||||
pSupporter1->pTSBuf = NULL;
|
||||
tsBufDestroy(pSupporter2->pTSBuf);
|
||||
pSupporter2->pTSBuf = NULL;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tsBufFlush(ctxlist[i].res);
|
||||
|
||||
tsBufDestroy(ctxlist[i].p->pTSBuf);
|
||||
ctxlist[i].p->pTSBuf = NULL;
|
||||
}
|
||||
|
||||
TSKEY et = taosGetTimestampUs();
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(output1), et - st);
|
||||
|
||||
return output1->numOfTotal;
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
tscDebug("%p sub:%p tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
|
||||
pSql, pSql->pSubs[i], i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey,
|
||||
tsBufGetNumOfGroup(ctxlist[i].res), et - st);
|
||||
}
|
||||
|
||||
return ctxlist[0].res->numOfTotal;
|
||||
}
|
||||
|
||||
|
||||
// todo handle failed to create sub query
|
||||
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
|
||||
SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
|
||||
|
@ -768,76 +889,218 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
|
||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||
|
||||
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);
|
||||
|
||||
// sort according to the tag value
|
||||
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
|
||||
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
|
||||
|
||||
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
|
||||
int16_t joinNum = pParentSql->subState.numOfSub;
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||
|
||||
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
|
||||
SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
|
||||
SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
|
||||
|
||||
// int16_t for padding
|
||||
int32_t size = p1->tagSize - sizeof(int16_t);
|
||||
*s1 = taosArrayInit(p1->num, size);
|
||||
*s2 = taosArrayInit(p2->num, size);
|
||||
int32_t size = p0->tagSize - sizeof(int16_t);
|
||||
|
||||
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
|
||||
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
||||
}
|
||||
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||
|
||||
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match", pParentSql);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < p1->num && j < p2->num) {
|
||||
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
|
||||
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
|
||||
assert(pp1->tid != 0 && pp2->tid != 0);
|
||||
for (int32_t i = 0; i < joinNum; i++) {
|
||||
SJoinSupporter* p = pParentSql->pSubs[i]->param;
|
||||
|
||||
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
|
||||
if (ret == 0) {
|
||||
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
|
||||
*(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);
|
||||
|
||||
taosArrayPush(*s1, pp1);
|
||||
taosArrayPush(*s2, pp2);
|
||||
j++;
|
||||
i++;
|
||||
} else if (ret > 0) {
|
||||
j++;
|
||||
} else {
|
||||
i++;
|
||||
ctxlist[i].p = p;
|
||||
ctxlist[i].res = taosArrayInit(p->num, size);
|
||||
|
||||
tscDebug("Join %d - num:%d", i, p->num);
|
||||
|
||||
// sort according to the tag valu
|
||||
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
|
||||
|
||||
if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
|
||||
for (int32_t j = 0; j <= i; j++) {
|
||||
taosArrayDestroy(ctxlist[j].res);
|
||||
}
|
||||
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
||||
}
|
||||
}
|
||||
|
||||
// reorganize the tid-tag value according to both the vgroup id and tag values
|
||||
// sort according to the tag value
|
||||
size_t t1 = taosArrayGetSize(*s1);
|
||||
size_t t2 = taosArrayGetSize(*s2);
|
||||
int32_t slot = 0;
|
||||
size_t tableNum = 0;
|
||||
int16_t* tableMIdx = 0;
|
||||
int32_t equalNum = 0;
|
||||
int32_t stackidx = 0;
|
||||
int32_t mergeDone = 0;
|
||||
SMergeCtx* ctx = NULL;
|
||||
SMergeCtx* pctx = NULL;
|
||||
STidTags* cur = NULL;
|
||||
STidTags* prev = NULL;
|
||||
SArray* tagCond = NULL;
|
||||
|
||||
qsort((*s1)->pData, t1, size, tidTagsCompar);
|
||||
qsort((*s2)->pData, t2, size, tidTagsCompar);
|
||||
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
|
||||
pctx = &ctxlist[tidx];
|
||||
if (pctx->compared) {
|
||||
continue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
for(int32_t k = 0; k < t1; ++k) {
|
||||
STidTags* p = (*s1)->pData + size * k;
|
||||
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
|
||||
assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);
|
||||
|
||||
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
|
||||
|
||||
tableNum = taosArrayGetSize(tagCond);
|
||||
assert(tableNum >= 2);
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tagCond, i);
|
||||
SMergeCtx* tctx = &ctxlist[*tableMIdx];
|
||||
tctx->compared = 1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tableNum; ++i) {
|
||||
tableMIdx = taosArrayGet(tagCond, i);
|
||||
SMergeCtx* tctx = &ctxlist[*tableMIdx];
|
||||
if (tctx->p->num <= 0 || tctx->p->pIdTagList == NULL) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
mergeDone = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
|
||||
pctx = &ctxlist[*tableMIdx];
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, ++slot);
|
||||
|
||||
equalNum = 1;
|
||||
|
||||
while (1) {
|
||||
ctx = &ctxlist[*tableMIdx];
|
||||
|
||||
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
|
||||
|
||||
assert(cur->tid != 0 && prev->tid != 0);
|
||||
|
||||
ctxStack[stackidx++] = ctx;
|
||||
|
||||
int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
|
||||
if (ret == 0) {
|
||||
if (++equalNum < tableNum) {
|
||||
prev = cur;
|
||||
pctx = ctx;
|
||||
|
||||
if (++slot >= tableNum) {
|
||||
slot = 0;
|
||||
}
|
||||
|
||||
tableMIdx = taosArrayGet(tagCond, slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId,
|
||||
*(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);
|
||||
|
||||
assert(stackidx == tableNum);
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
|
||||
|
||||
taosArrayPush(tctx->res, prev);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
|
||||
ctxStack[stackidx++] = pctx;
|
||||
} else if (ret > 0) {
|
||||
stackidx--;
|
||||
|
||||
if (++ctx->idx >= ctx->p->num) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
stackidx--;
|
||||
|
||||
for (int32_t i = 0; i < stackidx; ++i) {
|
||||
SMergeCtx* tctx = ctxStack[i];
|
||||
if (++tctx->idx >= tctx->p->num) {
|
||||
mergeDone = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (mergeDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
stackidx = 0;
|
||||
equalNum = 1;
|
||||
|
||||
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
|
||||
ctxStack[stackidx++] = pctx;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
slot = 0;
|
||||
mergeDone = 0;
|
||||
stackidx = 0;
|
||||
}
|
||||
|
||||
for(int32_t k = 0; k < t1; ++k) {
|
||||
STidTags* p = (*s2)->pData + size * k;
|
||||
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
|
||||
}
|
||||
#endif
|
||||
for (int32_t i = 0; i < joinNum; ++i) {
|
||||
// reorganize the tid-tag value according to both the vgroup id and tag values
|
||||
// sort according to the tag value
|
||||
size_t num = taosArrayGetSize(ctxlist[i].res);
|
||||
|
||||
qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar);
|
||||
|
||||
tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
|
||||
taosArrayPush(resList, &ctxlist[i].res);
|
||||
|
||||
tscDebug("%p tags match complete, result num: %"PRIzu, pParentSql, num);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool emptyTagList(SArray* resList, int32_t size) {
|
||||
size_t rsize = taosArrayGetSize(resList);
|
||||
if (rsize != size) {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SArray** s = taosArrayGet(resList, i);
|
||||
if (taosArrayGetSize(*s) <= 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
|
||||
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
|
||||
|
||||
|
@ -939,19 +1202,19 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
return;
|
||||
}
|
||||
|
||||
SArray *s1 = NULL, *s2 = NULL;
|
||||
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
|
||||
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
|
||||
|
||||
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, resList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
pParentSql->res.code = code;
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
|
||||
taosArrayDestroy(s1);
|
||||
taosArrayDestroy(s2);
|
||||
taosArrayDestroy(resList);
|
||||
return;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
|
||||
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
|
||||
assert(pParentSql->fp != tscJoinQueryCallback);
|
||||
|
||||
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
|
||||
|
@ -963,37 +1226,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
|
||||
} else {
|
||||
// proceed to for ts_comp query
|
||||
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
|
||||
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
|
||||
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
|
||||
|
||||
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
|
||||
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
|
||||
|
||||
SSqlObj* psub1 = pParentSql->pSubs[0];
|
||||
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
|
||||
|
||||
SSqlObj* psub2 = pParentSql->pSubs[1];
|
||||
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
|
||||
|
||||
pParentSql->subState.numOfSub = 2;
|
||||
|
||||
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
|
||||
tscDebug("%p reset all sub states to 0", pParentSql);
|
||||
|
||||
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
|
||||
SSqlObj* sub = pParentSql->pSubs[m];
|
||||
issueTsCompQuery(sub, sub->param, pParentSql);
|
||||
// proceed to for ts_comp query
|
||||
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
|
||||
SArray** s = taosArrayGet(resList, m);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
|
||||
|
||||
SSqlObj* psub = pParentSql->pSubs[m];
|
||||
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
|
||||
|
||||
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
|
||||
tscDebug("%p reset all sub states to 0", pParentSql);
|
||||
|
||||
issueTsCompQuery(psub, psub->param, pParentSql);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(s1);
|
||||
taosArrayDestroy(s2);
|
||||
size_t rsize = taosArrayGetSize(resList);
|
||||
for (int32_t i = 0; i < rsize; ++i) {
|
||||
SArray** s = taosArrayGet(resList, i);
|
||||
if (*s) {
|
||||
taosArrayDestroy(*s);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(resList);
|
||||
}
|
||||
|
||||
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
|
||||
|
@ -1124,12 +1384,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|||
|
||||
tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
|
||||
|
||||
// proceeds to launched secondary query to retrieve final data
|
||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||
|
||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
|
||||
int64_t num = doTSBlockIntersect(pParentSql, &win);
|
||||
if (num <= 0) { // no result during ts intersect
|
||||
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
|
||||
freeJoinSubqueryObj(pParentSql);
|
||||
|
@ -1639,6 +1895,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
pNewQueryInfo->limit.limit = -1;
|
||||
pNewQueryInfo->limit.offset = 0;
|
||||
|
||||
pNewQueryInfo->order.orderColId = INT32_MIN;
|
||||
|
||||
// backup the data and clear it in the sqlcmd object
|
||||
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
|
||||
|
||||
|
|
|
@ -1279,6 +1279,34 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pColumnList);
|
||||
int16_t col = pColIndex->columnIndex;
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < numOfCols) {
|
||||
SColumn* pCol = taosArrayGetP(pColumnList, i);
|
||||
if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
|
||||
++i;
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i >= numOfCols || numOfCols == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
|
||||
// ignore the tbname columnIndex to be inserted into source list
|
||||
if (pColIndex->columnIndex < 0) {
|
||||
|
@ -1583,7 +1611,25 @@ int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
|
|||
dest->tbnameCond.uid = src->tbnameCond.uid;
|
||||
dest->tbnameCond.len = src->tbnameCond.len;
|
||||
|
||||
memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
|
||||
dest->joinInfo.hasJoin = src->joinInfo.hasJoin;
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
|
||||
if (src->joinInfo.joinTables[i]) {
|
||||
dest->joinInfo.joinTables[i] = calloc(1, sizeof(SJoinNode));
|
||||
|
||||
memcpy(dest->joinInfo.joinTables[i], src->joinInfo.joinTables[i], sizeof(SJoinNode));
|
||||
|
||||
if (src->joinInfo.joinTables[i]->tsJoin) {
|
||||
dest->joinInfo.joinTables[i]->tsJoin = taosArrayDup(src->joinInfo.joinTables[i]->tsJoin);
|
||||
}
|
||||
|
||||
if (src->joinInfo.joinTables[i]->tagJoin) {
|
||||
dest->joinInfo.joinTables[i]->tagJoin = taosArrayDup(src->joinInfo.joinTables[i]->tagJoin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
dest->relType = src->relType;
|
||||
|
||||
if (src->pCond == NULL) {
|
||||
|
@ -1629,6 +1675,23 @@ void tscTagCondRelease(STagCond* pTagCond) {
|
|||
taosArrayDestroy(pTagCond->pCond);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
|
||||
SJoinNode *node = pTagCond->joinInfo.joinTables[i];
|
||||
if (node == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (node->tsJoin != NULL) {
|
||||
taosArrayDestroy(node->tsJoin);
|
||||
}
|
||||
|
||||
if (node->tagJoin != NULL) {
|
||||
taosArrayDestroy(node->tagJoin);
|
||||
}
|
||||
|
||||
tfree(node);
|
||||
}
|
||||
|
||||
memset(pTagCond, 0, sizeof(STagCond));
|
||||
}
|
||||
|
||||
|
@ -2318,16 +2381,21 @@ void tscDoQuery(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
||||
if (pTagCond->joinInfo.left.uid == uid) {
|
||||
return pTagCond->joinInfo.left.tagColId;
|
||||
} else if (pTagCond->joinInfo.right.uid == uid) {
|
||||
return pTagCond->joinInfo.right.tagColId;
|
||||
} else {
|
||||
assert(0);
|
||||
return -1;
|
||||
int32_t i = 0;
|
||||
while (i < TSDB_MAX_JOIN_TABLE_NUM) {
|
||||
SJoinNode* node = pTagCond->joinInfo.joinTables[i];
|
||||
if (node && node->uid == uid) {
|
||||
return node->tagColId;
|
||||
}
|
||||
|
||||
i++;
|
||||
}
|
||||
|
||||
assert(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
|
||||
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t tsMaxBinaryDisplayWidth = 30;
|
|||
int32_t tsCompressMsgSize = -1;
|
||||
|
||||
// client
|
||||
int32_t tsMaxSQLStringLen = TSDB_MAX_SQL_LEN;
|
||||
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
|
||||
int8_t tsTscEnableRecordSql = 0;
|
||||
|
||||
// the maximum number of results for projection query on super table that are returned from
|
||||
|
|
|
@ -259,7 +259,7 @@ do { \
|
|||
#define TSDB_MIN_TABLES 4
|
||||
#define TSDB_MAX_TABLES 10000000
|
||||
#define TSDB_DEFAULT_TABLES 1000000
|
||||
#define TSDB_TABLES_STEP 1000
|
||||
#define TSDB_TABLES_STEP 100
|
||||
|
||||
#define TSDB_MIN_DAYS_PER_FILE 1
|
||||
#define TSDB_MAX_DAYS_PER_FILE 3650
|
||||
|
@ -317,7 +317,7 @@ do { \
|
|||
#define TSDB_MAX_DB_QUORUM_OPTION 2
|
||||
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
|
||||
|
||||
#define TSDB_MAX_JOIN_TABLE_NUM 5
|
||||
#define TSDB_MAX_JOIN_TABLE_NUM 10
|
||||
#define TSDB_MAX_UNION_CLAUSE 5
|
||||
|
||||
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_BYTES_PER_ROW-TSDB_KEYSIZE)
|
||||
|
|
|
@ -3,6 +3,53 @@ PROJECT(TDengine)
|
|||
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
||||
|
||||
FIND_PACKAGE(Git)
|
||||
IF (GIT_FOUND)
|
||||
MESSAGE("Git found")
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND ${GIT_EXECUTABLE} log --pretty=oneline -n 1 ../src/kit/taosdemo/taosdemo.c
|
||||
RESULT_VARIABLE RESULT
|
||||
OUTPUT_VARIABLE TAOSDEMO_COMMIT)
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND bash "-c" "echo '${TAOSDEMO_COMMIT}' | awk '{print $1}' | cut -c -9"
|
||||
RESULT_VARIABLE RESULT
|
||||
OUTPUT_VARIABLE TAOSDEMO_COMMIT_SHA1)
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND ${GIT_EXECUTABLE} status -z -s ../src/kit/taosdemo/taosdemo.c
|
||||
RESULT_VARIABLE RESULT
|
||||
OUTPUT_VARIABLE TAOSDEMO_STATUS)
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND bash "-c" "echo '${TAOSDEMO_STATUS}' | awk '{print $1}'"
|
||||
RESULT_VARIABLE RESULT
|
||||
OUTPUT_VARIABLE TAOSDEMO_STATUS)
|
||||
MESSAGE("taosdemo.c status: " ${TAOSDEMO_STATUS})
|
||||
ELSE()
|
||||
MESSAGE("Git not found")
|
||||
SET(TAOSDEMO_COMMIT_SHA1 "unknown")
|
||||
SET(TAOSDEMO_STATUS "unknown")
|
||||
ENDIF (GIT_FOUND)
|
||||
|
||||
STRING(STRIP ${TAOSDEMO_COMMIT_SHA1} TAOSDEMO_COMMIT_SHA1)
|
||||
MESSAGE("taosdemo's latest commit in short is:" ${TAOSDEMO_COMMIT_SHA1})
|
||||
STRING(STRIP ${TAOSDEMO_STATUS} TAOSDEMO_STATUS)
|
||||
|
||||
IF (TAOSDEMO_STATUS MATCHES "M")
|
||||
SET(TAOSDEMO_STATUS "modified")
|
||||
ELSE()
|
||||
SET(TAOSDEMO_STATUS "")
|
||||
ENDIF ()
|
||||
MESSAGE("taosdemo's status is:" ${TAOSDEMO_STATUS})
|
||||
|
||||
ADD_DEFINITIONS(-DTAOSDEMO_COMMIT_SHA1="${TAOSDEMO_COMMIT_SHA1}")
|
||||
ADD_DEFINITIONS(-DTAOSDEMO_STATUS="${TAOSDEMO_STATUS}")
|
||||
|
||||
MESSAGE("VERNUMBER is:" ${VERNUMBER})
|
||||
IF (VERNUMBER MATCHES "")
|
||||
ADD_DEFINITIONS(-DTD_VERNUMBER="TDengie-version-unknown")
|
||||
ELSE()
|
||||
ADD_DEFINITIONS(-DTD_VERNUMBER="${VERNUMBER}")
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
ADD_EXECUTABLE(taosdemo ${SRC})
|
||||
|
|
|
@ -86,7 +86,7 @@ enum TEST_MODE {
|
|||
#define MAX_COLUMN_COUNT 1024
|
||||
#define MAX_TAG_COUNT 128
|
||||
|
||||
#define MAX_QUERY_SQL_COUNT 10
|
||||
#define MAX_QUERY_SQL_COUNT 100
|
||||
#define MAX_QUERY_SQL_LENGTH 256
|
||||
|
||||
#define MAX_DATABASE_COUNT 256
|
||||
|
@ -94,6 +94,7 @@ enum TEST_MODE {
|
|||
|
||||
#define DEFAULT_TIMESTAMP_STEP 1
|
||||
|
||||
|
||||
typedef enum CREATE_SUB_TALBE_MOD_EN {
|
||||
PRE_CREATE_SUBTBL,
|
||||
AUTO_CREATE_SUBTBL,
|
||||
|
@ -118,11 +119,11 @@ typedef enum enum_INSERT_MODE {
|
|||
INVALID_INSERT_MODE
|
||||
} INSERT_MODE;
|
||||
|
||||
enum QUERY_TYPE {
|
||||
typedef enum enumQUERY_TYPE {
|
||||
NO_INSERT_TYPE,
|
||||
INSERT_TYPE,
|
||||
QUERY_TYPE_BUT
|
||||
} ;
|
||||
} QUERY_TYPE;
|
||||
|
||||
enum _show_db_index {
|
||||
TSDB_SHOW_DB_NAME_INDEX,
|
||||
|
@ -141,7 +142,7 @@ enum _show_db_index {
|
|||
TSDB_SHOW_DB_FSYNC_INDEX,
|
||||
TSDB_SHOW_DB_COMP_INDEX,
|
||||
TSDB_SHOW_DB_CACHELAST_INDEX,
|
||||
TSDB_SHOW_DB_PRECISION_INDEX,
|
||||
TSDB_SHOW_DB_PRECISION_INDEX,
|
||||
TSDB_SHOW_DB_UPDATE_INDEX,
|
||||
TSDB_SHOW_DB_STATUS_INDEX,
|
||||
TSDB_MAX_SHOW_DB
|
||||
|
@ -225,7 +226,6 @@ typedef struct SColumn_S {
|
|||
typedef struct SSuperTable_S {
|
||||
char sTblName[MAX_TB_NAME_SIZE+1];
|
||||
int childTblCount;
|
||||
bool superTblExists; // 0: no, 1: yes
|
||||
bool childTblExists; // 0: no, 1: yes
|
||||
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
|
||||
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||
|
@ -236,7 +236,7 @@ typedef struct SSuperTable_S {
|
|||
int childTblOffset;
|
||||
|
||||
int multiThreadWriteOneTbl; // 0: no, 1: yes
|
||||
int rowsPerTbl; //
|
||||
int interlaceRows; //
|
||||
int disorderRatio; // 0: no disorder, >0: x%
|
||||
int disorderRange; // ms or us by database precision
|
||||
int maxSqlLen; //
|
||||
|
@ -499,7 +499,7 @@ static int taosRandom()
|
|||
|
||||
static int createDatabases();
|
||||
static void createChildTables();
|
||||
static int queryDbExec(TAOS *taos, char *command, int type);
|
||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
|
||||
|
||||
/* ************ Global variables ************ */
|
||||
|
||||
|
@ -590,6 +590,32 @@ static FILE * g_fpOfInsertResult = NULL;
|
|||
|
||||
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
|
||||
|
||||
#ifndef TAOSDEMO_COMMIT_SHA1
|
||||
#define TAOSDEMO_COMMIT_SHA1 "unknown"
|
||||
#endif
|
||||
|
||||
#ifndef TD_VERNUMBER
|
||||
#define TD_VERNUMBER "unknown"
|
||||
#endif
|
||||
|
||||
#ifndef TAOSDEMO_STATUS
|
||||
#define TAOSDEMO_STATUS "unknown"
|
||||
#endif
|
||||
|
||||
static void printVersion() {
|
||||
char tdengine_ver[] = TD_VERNUMBER;
|
||||
char taosdemo_ver[] = TAOSDEMO_COMMIT_SHA1;
|
||||
char taosdemo_status[] = TAOSDEMO_STATUS;
|
||||
|
||||
if (strlen(taosdemo_status) == 0) {
|
||||
printf("taosdemo verison %s-%s\n",
|
||||
tdengine_ver, taosdemo_ver);
|
||||
} else {
|
||||
printf("taosdemo verison %s-%s, status:%s\n",
|
||||
tdengine_ver, taosdemo_ver, taosdemo_status);
|
||||
}
|
||||
}
|
||||
|
||||
static void printHelp() {
|
||||
char indent[10] = " ";
|
||||
printf("%s%s%s%s\n", indent, "-f", indent,
|
||||
|
@ -647,6 +673,8 @@ static void printHelp() {
|
|||
"Out of order data's range, ms, default is 1000.");
|
||||
printf("%s%s%s%s\n", indent, "-g", indent,
|
||||
"Print debug info.");
|
||||
printf("%s%s%s%s\n", indent, "-V, --version", indent,
|
||||
"Print version info.");
|
||||
/* printf("%s%s%s%s\n", indent, "-D", indent,
|
||||
"if elete database if exists. 0: no, 1: yes, default is 1");
|
||||
*/
|
||||
|
@ -788,6 +816,10 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
|| arguments->method_of_delete > 3) {
|
||||
arguments->method_of_delete = 0;
|
||||
}
|
||||
} else if ((strcmp(argv[i], "--version") == 0) ||
|
||||
(strcmp(argv[i], "-V") == 0)){
|
||||
printVersion();
|
||||
exit(0);
|
||||
} else if (strcmp(argv[i], "--help") == 0) {
|
||||
printHelp();
|
||||
exit(0);
|
||||
|
@ -859,7 +891,7 @@ static void tmfree(char *buf) {
|
|||
}
|
||||
}
|
||||
|
||||
static int queryDbExec(TAOS *taos, char *command, int type) {
|
||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
|
||||
int i;
|
||||
TAOS_RES *res = NULL;
|
||||
int32_t code = -1;
|
||||
|
@ -874,12 +906,14 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
|
|||
code = taos_errno(res);
|
||||
if (0 == code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
|
||||
errorPrint( "Failed to run %s, reason: %s\n", command, taos_errstr(res));
|
||||
if (!quiet) {
|
||||
debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
|
||||
errorPrint("Failed to run %s, reason: %s\n", command, taos_errstr(res));
|
||||
}
|
||||
taos_free_result(res);
|
||||
//taos_close(taos);
|
||||
return -1;
|
||||
|
@ -890,7 +924,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
|
|||
taos_free_result(res);
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
|
||||
taos_free_result(res);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1166,8 +1200,8 @@ static int printfInsertMeta() {
|
|||
}else {
|
||||
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
|
||||
}
|
||||
printf(" rowsPerTbl: \033[33m%d\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].rowsPerTbl);
|
||||
printf(" interlaceRows: \033[33m%d\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||
printf(" disorderRange: \033[33m%d\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].disorderRange);
|
||||
printf(" disorderRatio: \033[33m%d\033[0m\n",
|
||||
|
@ -1328,7 +1362,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
|||
}else {
|
||||
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
|
||||
}
|
||||
fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl);
|
||||
fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
|
||||
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
|
||||
fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen);
|
||||
|
@ -1388,11 +1422,11 @@ static void printfQueryMeta() {
|
|||
printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName);
|
||||
|
||||
printf("\n");
|
||||
printf("specified table query info: \n");
|
||||
printf("specified table query info: \n");
|
||||
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate);
|
||||
printf("query times: \033[33m%d\033[0m\n", g_args.query_times);
|
||||
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
|
||||
|
||||
if (SUBSCRIBE_TEST == g_args.test_mode) {
|
||||
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
|
||||
|
@ -1405,7 +1439,7 @@ static void printfQueryMeta() {
|
|||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]);
|
||||
}
|
||||
printf("\n");
|
||||
printf("super table query info: \n");
|
||||
printf("super table query info: \n");
|
||||
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.rate);
|
||||
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.threadCnt);
|
||||
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount);
|
||||
|
@ -1417,11 +1451,11 @@ static void printfQueryMeta() {
|
|||
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.sqlCount);
|
||||
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.sqlCount);
|
||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.subQueryInfo.sql[i]);
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
|
||||
SHOW_PARSE_RESULT_END();
|
||||
|
@ -2320,7 +2354,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
|
|||
dbName, superTbls->sTblName, cols, tags);
|
||||
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
|
||||
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
errorPrint( "create supertable %s failed!\n\n",
|
||||
superTbls->sTblName);
|
||||
return -1;
|
||||
|
@ -2344,7 +2378,7 @@ static int createDatabases() {
|
|||
if (g_Dbs.db[i].drop) {
|
||||
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
|
||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
@ -2418,7 +2452,7 @@ static int createDatabases() {
|
|||
}
|
||||
|
||||
debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
|
||||
taos_close(taos);
|
||||
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
|
||||
return -1;
|
||||
|
@ -2428,7 +2462,13 @@ static int createDatabases() {
|
|||
debugPrint("%s() %d supertbl count:%d\n",
|
||||
__func__, __LINE__, g_Dbs.db[i].superTblCount);
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
if ((g_Dbs.db[i].drop) || (g_Dbs.db[i].superTbls[j].superTblExists == TBL_NO_EXISTS)) {
|
||||
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
|
||||
g_Dbs.db[i].superTbls[j].sTblName);
|
||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
|
||||
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
|
||||
|
||||
if ((ret != 0) || (g_Dbs.db[i].drop)) {
|
||||
ret = createSuperTable(taos, g_Dbs.db[i].dbName,
|
||||
&g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
|
||||
|
||||
|
@ -2439,27 +2479,14 @@ static int createDatabases() {
|
|||
}
|
||||
}
|
||||
|
||||
/* describe super table, if exists
|
||||
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
|
||||
g_Dbs.db[i].superTbls[j].sTblName);
|
||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
|
||||
|
||||
} else {
|
||||
*/
|
||||
g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS;
|
||||
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
|
||||
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
|
||||
&g_Dbs.db[i].superTbls[j]);
|
||||
//}
|
||||
if (0 != ret) {
|
||||
errorPrint("\nget super table %s.%s info failed!\n\n", g_Dbs.db[i].dbName,
|
||||
g_Dbs.db[i].superTbls[j].sTblName);
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (0 != ret) {
|
||||
errorPrint("\nget super table %s.%s info failed!\n\n",
|
||||
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2539,9 +2566,9 @@ static void* createTable(void *sarg)
|
|||
|
||||
len = 0;
|
||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
|
||||
free(buffer);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)){
|
||||
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2555,7 +2582,7 @@ static void* createTable(void *sarg)
|
|||
|
||||
if (0 != len) {
|
||||
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) {
|
||||
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
}
|
||||
}
|
||||
|
@ -2651,8 +2678,8 @@ static void createChildTables() {
|
|||
int startFrom = 0;
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
|
||||
verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__,
|
||||
g_totalChildTables, startFrom);
|
||||
verbosePrint("%s() LN%d: create %d child tables from %d\n",
|
||||
__func__, __LINE__, g_totalChildTables, startFrom);
|
||||
startMultiThreadCreateChildTable(
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
|
@ -3028,9 +3055,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
if (threads2 && threads2->type == cJSON_Number) {
|
||||
g_Dbs.threadCountByCreateTbl = threads2->valueint;
|
||||
} else if (!threads2) {
|
||||
g_Dbs.threadCountByCreateTbl = 1;
|
||||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
} else {
|
||||
printf("ERROR: failed to read json, threads2 not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3545,13 +3573,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "interlace_rows");
|
||||
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
|
||||
g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint;
|
||||
} else if (!rowsPerTbl) {
|
||||
g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||
cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
|
||||
if (interlaceRows && interlaceRows->type == cJSON_Number) {
|
||||
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
|
||||
} else if (!interlaceRows) {
|
||||
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, rowsPerTbl input mistake\n", __func__, __LINE__);
|
||||
errorPrint(
|
||||
"%s() LN%d, failed to read json, interlace rows input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3581,7 +3611,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!insertRows) {
|
||||
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3593,16 +3624,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
__func__, __LINE__, g_args.insert_interval);
|
||||
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
/* CBD if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable
|
||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
*/
|
||||
|
||||
int retVal = getColumnAndTagTypeFromInsertJsonFile(
|
||||
stbInfo, &g_Dbs.db[i].superTbls[j]);
|
||||
if (false == retVal) {
|
||||
|
@ -4223,7 +4249,7 @@ static int execInsert(threadInfo *pThreadInfo, char *buffer, int k)
|
|||
__func__, __LINE__, buffer);
|
||||
if (superTblInfo) {
|
||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE);
|
||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
||||
} else {
|
||||
if (0 != postProceSql(g_Dbs.host, g_Dbs.port, buffer)) {
|
||||
affectedRows = -1;
|
||||
|
@ -4233,7 +4259,7 @@ static int execInsert(threadInfo *pThreadInfo, char *buffer, int k)
|
|||
}
|
||||
}
|
||||
} else {
|
||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, 1);
|
||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
||||
}
|
||||
|
||||
return affectedRows;
|
||||
|
@ -4472,17 +4498,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
int insertMode;
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.interlace_rows;
|
||||
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
|
||||
|
||||
if (rowsPerTbl > 0) {
|
||||
if (interlaceRows > 0) {
|
||||
insertMode = INTERLACE_INSERT_MODE;
|
||||
} else {
|
||||
insertMode = PROGRESSIVE_INSERT_MODE;
|
||||
}
|
||||
|
||||
// rows per table need be less than insert batch
|
||||
if (rowsPerTbl > g_args.num_of_RPR)
|
||||
rowsPerTbl = g_args.num_of_RPR;
|
||||
if (interlaceRows > g_args.num_of_RPR)
|
||||
interlaceRows = g_args.num_of_RPR;
|
||||
|
||||
pThreadInfo->totalInsertRows = 0;
|
||||
pThreadInfo->totalAffectedRows = 0;
|
||||
|
@ -4510,13 +4536,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
|
||||
assert(pThreadInfo->ntables > 0);
|
||||
|
||||
if (rowsPerTbl > g_args.num_of_RPR)
|
||||
rowsPerTbl = g_args.num_of_RPR;
|
||||
if (interlaceRows > g_args.num_of_RPR)
|
||||
interlaceRows = g_args.num_of_RPR;
|
||||
|
||||
batchPerTbl = rowsPerTbl;
|
||||
if ((rowsPerTbl > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTbl = interlaceRows;
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
(g_args.num_of_RPR / (rowsPerTbl * pThreadInfo->ntables)) + 1;
|
||||
(g_args.num_of_RPR / (interlaceRows * pThreadInfo->ntables)) + 1;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
|
@ -4797,9 +4823,9 @@ static void* syncWrite(void *sarg) {
|
|||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||
|
||||
int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.interlace_rows;
|
||||
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
|
||||
|
||||
if (rowsPerTbl > 0) {
|
||||
if (interlaceRows > 0) {
|
||||
// interlace mode
|
||||
return syncWriteInterlace(winfo);
|
||||
} else {
|
||||
|
@ -4876,7 +4902,7 @@ static void *asyncWrite(void *sarg) {
|
|||
winfo->et = 0;
|
||||
winfo->lastTs = winfo->start_time;
|
||||
|
||||
int insert_interval =
|
||||
int insert_interval =
|
||||
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
||||
if (insert_interval) {
|
||||
winfo->st = taosGetTimestampUs();
|
||||
|
@ -4941,12 +4967,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
} else if (0 == strncasecmp(precision, "us", 2)) {
|
||||
timePrec = TSDB_TIME_PRECISION_MICRO;
|
||||
} else {
|
||||
errorPrint( "No support precision: %s\n", precision);
|
||||
errorPrint("Not support precision: %s\n", precision);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t start_time;
|
||||
int64_t start_time;
|
||||
if (superTblInfo) {
|
||||
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
|
||||
start_time = taosGetTimestamp(timePrec);
|
||||
|
@ -4973,19 +4999,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
startFrom = 0;
|
||||
|
||||
// read sample data from file first
|
||||
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
|
||||
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
|
||||
"sample", strlen("sample")))) {
|
||||
if (0 != prepareSampleDataForSTable(superTblInfo)) {
|
||||
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
|
||||
__func__, __LINE__);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
// read sample data from file first
|
||||
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
|
||||
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
|
||||
"sample", strlen("sample")))) {
|
||||
if (0 != prepareSampleDataForSTable(superTblInfo)) {
|
||||
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
|
||||
__func__, __LINE__);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
@ -5045,7 +5073,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
g_Dbs.host, g_Dbs.user,
|
||||
g_Dbs.password, db_name, g_Dbs.port);
|
||||
if (NULL == t_info->taos) {
|
||||
errorPrint( "connect to server fail from insert sub thread, reason: %s\n",
|
||||
errorPrint(
|
||||
"connect to server fail from insert sub thread, reason: %s\n",
|
||||
taos_errstr(NULL));
|
||||
exit(-1);
|
||||
}
|
||||
|
@ -5144,7 +5173,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
|
||||
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||
|
||||
|
||||
//taos_close(taos);
|
||||
|
||||
free(pids);
|
||||
|
@ -5187,7 +5216,8 @@ static void *readTable(void *sarg) {
|
|||
double totalT = 0;
|
||||
int count = 0;
|
||||
for (int i = 0; i < num_of_tables; i++) {
|
||||
sprintf(command, "select %s from %s%d where ts>= %" PRId64, aggreFunc[j], tb_prefix, i, sTime);
|
||||
sprintf(command, "select %s from %s%d where ts>= %" PRId64,
|
||||
aggreFunc[j], tb_prefix, i, sTime);
|
||||
|
||||
double t = getCurrentTime();
|
||||
TAOS_RES *pSql = taos_query(taos, command);
|
||||
|
@ -5340,10 +5370,10 @@ static int insertTestProcess() {
|
|||
|
||||
if (g_totalChildTables > 0) {
|
||||
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
|
||||
end - start, g_totalChildTables, g_Dbs.threadCount);
|
||||
end - start, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
||||
fprintf(g_fpOfInsertResult,
|
||||
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
|
||||
end - start, g_totalChildTables, g_Dbs.threadCount);
|
||||
end - start, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
||||
}
|
||||
|
||||
taosMsleep(1000);
|
||||
|
@ -5394,7 +5424,10 @@ static void *superQueryProcess(void *sarg) {
|
|||
|
||||
int64_t st = 0;
|
||||
int64_t et = 0;
|
||||
while (1) {
|
||||
|
||||
int queryTimes = g_args.query_times;
|
||||
|
||||
while(queryTimes --) {
|
||||
if (g_queryInfo.superQueryInfo.rate && (et - st) <
|
||||
(int64_t)g_queryInfo.superQueryInfo.rate*1000) {
|
||||
taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
|
||||
|
@ -5443,12 +5476,12 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
|
|||
g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
|
||||
|
||||
//printf("inSql: %s\n", inSql);
|
||||
|
||||
|
||||
char* pos = strstr(inSql, sourceString);
|
||||
if (0 == pos) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
tstrncpy(outSql, inSql, pos - inSql + 1);
|
||||
//printf("1: %s\n", outSql);
|
||||
strcat(outSql, subTblName);
|
||||
|
@ -5464,7 +5497,7 @@ static void *subQueryProcess(void *sarg) {
|
|||
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
|
||||
int queryTimes = g_args.query_times;
|
||||
|
||||
while (queryTimes --) {
|
||||
while(queryTimes --) {
|
||||
if (g_queryInfo.subQueryInfo.rate
|
||||
&& (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) {
|
||||
taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms
|
||||
|
@ -5486,7 +5519,7 @@ static void *subQueryProcess(void *sarg) {
|
|||
}
|
||||
}
|
||||
et = taosGetTimestampUs();
|
||||
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
|
||||
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
|
||||
taosGetSelfPthreadId(),
|
||||
winfo->start_table_from,
|
||||
winfo->end_table_to,
|
||||
|
@ -5509,7 +5542,8 @@ static int queryTestProcess() {
|
|||
NULL,
|
||||
g_queryInfo.port);
|
||||
if (taos == NULL) {
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n",
|
||||
taos_errstr(NULL));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
|
@ -5556,7 +5590,7 @@ static int queryTestProcess() {
|
|||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
||||
free(infos);
|
||||
free(pids);
|
||||
errorPrint( "use database %s failed!\n\n",
|
||||
|
@ -5686,7 +5720,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)){
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5752,7 +5786,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5830,7 +5864,8 @@ static int subscribeTestProcess() {
|
|||
g_queryInfo.dbName,
|
||||
g_queryInfo.port);
|
||||
if (taos == NULL) {
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n",
|
||||
taos_errstr(NULL));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
|
@ -5856,7 +5891,7 @@ static int subscribeTestProcess() {
|
|||
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
|
||||
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
|
||||
if ((NULL == pids) || (NULL == infos)) {
|
||||
printf("malloc failed for create threads\n");
|
||||
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
|
||||
taos_close(taos);
|
||||
exit(-1);
|
||||
}
|
||||
|
@ -5931,7 +5966,7 @@ static int subscribeTestProcess() {
|
|||
|
||||
static void initOfInsertMeta() {
|
||||
memset(&g_Dbs, 0, sizeof(SDbs));
|
||||
|
||||
|
||||
// set default values
|
||||
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE);
|
||||
g_Dbs.port = 6030;
|
||||
|
@ -5944,7 +5979,7 @@ static void initOfInsertMeta() {
|
|||
|
||||
static void initOfQueryMeta() {
|
||||
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
|
||||
|
||||
|
||||
// set default values
|
||||
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE);
|
||||
g_queryInfo.port = 6030;
|
||||
|
@ -5961,15 +5996,15 @@ static void setParaFromArg(){
|
|||
|
||||
if (g_args.user) {
|
||||
strcpy(g_Dbs.user, g_args.user);
|
||||
}
|
||||
}
|
||||
|
||||
if (g_args.password) {
|
||||
strcpy(g_Dbs.password, g_args.password);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (g_args.port) {
|
||||
g_Dbs.port = g_args.port;
|
||||
}
|
||||
}
|
||||
|
||||
g_Dbs.threadCount = g_args.num_of_threads;
|
||||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
|
@ -5990,11 +6025,11 @@ static void setParaFromArg(){
|
|||
|
||||
char dataString[STRING_LEN];
|
||||
char **data_type = g_args.datatype;
|
||||
|
||||
|
||||
memset(dataString, 0, STRING_LEN);
|
||||
|
||||
if (strcasecmp(data_type[0], "BINARY") == 0
|
||||
|| strcasecmp(data_type[0], "BOOL") == 0
|
||||
if (strcasecmp(data_type[0], "BINARY") == 0
|
||||
|| strcasecmp(data_type[0], "BOOL") == 0
|
||||
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
|
||||
g_Dbs.do_aggreFunc = false;
|
||||
}
|
||||
|
@ -6004,22 +6039,21 @@ static void setParaFromArg(){
|
|||
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
|
||||
g_Dbs.threadCount = g_args.num_of_threads;
|
||||
g_Dbs.threadCountByCreateTbl = 1;
|
||||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
g_Dbs.queryMode = g_args.mode;
|
||||
|
||||
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS;
|
||||
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
|
||||
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
|
||||
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
|
||||
g_args.tb_prefix, MAX_TB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
|
||||
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
|
||||
|
||||
|
||||
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
|
||||
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
|
||||
|
||||
|
@ -6029,33 +6063,32 @@ static void setParaFromArg(){
|
|||
break;
|
||||
}
|
||||
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
|
||||
data_type[i], MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
|
||||
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
|
||||
g_Dbs.db[0].superTbls[0].columnCount++;
|
||||
}
|
||||
|
||||
|
||||
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
|
||||
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
|
||||
} else {
|
||||
for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) {
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
|
||||
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
|
||||
g_Dbs.db[0].superTbls[0].columnCount++;
|
||||
}
|
||||
}
|
||||
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
|
||||
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
|
||||
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
|
||||
g_Dbs.db[0].superTbls[0].tagCount = 2;
|
||||
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
|
||||
g_Dbs.db[0].superTbls[0].tagCount = 2;
|
||||
} else {
|
||||
g_Dbs.threadCountByCreateTbl = 1;
|
||||
g_Dbs.db[0].superTbls[0].tagCount = 0;
|
||||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
g_Dbs.db[0].superTbls[0].tagCount = 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* Function to do regular expression check */
|
||||
|
@ -6126,8 +6159,9 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
|
||||
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) {
|
||||
printf("queryDbExec %s failed!\n", cmd);
|
||||
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE, false)) {
|
||||
errorPrint("%s() LN%d, queryDbExec %s failed!\n",
|
||||
__func__, __LINE__, cmd);
|
||||
tmfree(cmd);
|
||||
tmfree(line);
|
||||
tmfclose(fp);
|
||||
|
@ -6185,7 +6219,7 @@ static void queryResult() {
|
|||
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
|
||||
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
|
||||
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
|
||||
strcpy(rInfo->tb_prefix,
|
||||
strcpy(rInfo->tb_prefix,
|
||||
g_Dbs.db[0].superTbls[0].childTblPrefix);
|
||||
} else {
|
||||
rInfo->ntables = g_args.num_of_tables;
|
||||
|
@ -6194,13 +6228,14 @@ static void queryResult() {
|
|||
}
|
||||
|
||||
rInfo->taos = taos_connect(
|
||||
g_Dbs.host,
|
||||
g_Dbs.user,
|
||||
g_Dbs.password,
|
||||
g_Dbs.db[0].dbName,
|
||||
g_Dbs.host,
|
||||
g_Dbs.user,
|
||||
g_Dbs.password,
|
||||
g_Dbs.db[0].dbName,
|
||||
g_Dbs.port);
|
||||
if (rInfo->taos == NULL) {
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
|
||||
errorPrint( "Failed to connect to TDengine, reason:%s\n",
|
||||
taos_errstr(NULL));
|
||||
free(rInfo);
|
||||
exit(-1);
|
||||
}
|
||||
|
|
|
@ -1017,6 +1017,13 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
|
|||
return pConn;
|
||||
}
|
||||
|
||||
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
|
||||
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
|
||||
SRpcInfo *pRpc = pConn->pRpc;
|
||||
(*(pRpc->cfp))(pRpcMsg, NULL);
|
||||
free(pRpcMsg);
|
||||
}
|
||||
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
|
||||
SRpcInfo *pRpc = pConn->pRpc;
|
||||
if (pConn->pReqMsg == NULL) return;
|
||||
|
@ -1025,16 +1032,20 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
|
|||
rpcAddRef(pRpc);
|
||||
tDebug("%s, notify the server app, connection is gone", pConn->info);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
|
||||
rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
|
||||
rpcMsg.ahandle = pConn->ahandle;
|
||||
rpcMsg.handle = pConn;
|
||||
rpcMsg.msgType = pConn->inType;
|
||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
|
||||
rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
|
||||
rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
|
||||
rpcMsg->ahandle = pConn->ahandle;
|
||||
rpcMsg->handle = pConn;
|
||||
rpcMsg->msgType = pConn->inType;
|
||||
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
pConn->pReqMsg = NULL;
|
||||
pConn->reqMsgLen = 0;
|
||||
if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL);
|
||||
if (pRpc->cfp) {
|
||||
taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
|
||||
} else {
|
||||
free(rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||
|
@ -1051,7 +1062,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
|||
pConn->pReqMsg = NULL;
|
||||
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
|
||||
}
|
||||
|
||||
|
||||
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
|
||||
|
||||
rpcReleaseConn(pConn);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#define TSDB_MAX_SUBBLOCKS 8
|
||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
||||
if (key < 0) {
|
||||
return (int)(-((-key) / tsMsPerDay[precision] / days + 1));
|
||||
return (int)((key + 1) / tsMsPerDay[precision] / days + 1);
|
||||
} else {
|
||||
return (int)((key / tsMsPerDay[precision] / days));
|
||||
}
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
def pre_test(){
|
||||
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
|
||||
sh '''
|
||||
sudo rmtaos
|
||||
'''
|
||||
}
|
||||
|
||||
sh '''
|
||||
sudo rmtaos||echo 'no taosd installed'
|
||||
'''
|
||||
sh '''
|
||||
cd ${WKC}
|
||||
git reset --hard
|
||||
|
@ -56,14 +55,8 @@ pipeline {
|
|||
cd ${WKC}/tests
|
||||
./test-all.sh b1
|
||||
date'''
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full jdbc
|
||||
date'''
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full unit
|
||||
date'''
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,6 +129,10 @@ pipeline {
|
|||
./test-all.sh b2
|
||||
date
|
||||
'''
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full unit
|
||||
date'''
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,6 +151,10 @@ pipeline {
|
|||
'''
|
||||
}
|
||||
sh '''
|
||||
cd ${WKC}/tests
|
||||
./test-all.sh full jdbc
|
||||
date'''
|
||||
sh '''
|
||||
cd ${WKC}/tests/pytest
|
||||
./valgrind-test.sh 2>&1 > mem-error-out.log
|
||||
./handle_val_log.sh
|
||||
|
|
|
@ -163,6 +163,16 @@ int main(int argc, char *argv[])
|
|||
|
||||
getchar();
|
||||
|
||||
while(1) {
|
||||
if (tablesProcessed < numOfTables) {
|
||||
printf("wait for process finished\n");
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
taos_close(taos);
|
||||
free(tableList);
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -29,7 +29,25 @@ function dohavecore(){
|
|||
proc=`echo $corefile|cut -d "_" -f3`
|
||||
if [ -n "$corefile" ];then
|
||||
echo 'taosd or taos has generated core'
|
||||
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz /usr/local/taos/
|
||||
if [[ "$tests_dir" == *"$IN_TDINTERNAL"* ]] && [[ $1 == 1 ]]; then
|
||||
cd ../../../
|
||||
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so*
|
||||
if [[ $2 == 1 ]];then
|
||||
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S"`
|
||||
rm -rf sim/case.log
|
||||
else
|
||||
cd community
|
||||
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
|
||||
rm -rf sim/case.log
|
||||
fi
|
||||
else
|
||||
cd ../../
|
||||
if [[ $1 == 1 ]];then
|
||||
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so*
|
||||
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
|
||||
rm -rf sim/case.log
|
||||
fi
|
||||
fi
|
||||
if [[ $1 == 1 ]];then
|
||||
echo '\n'|gdb /usr/local/taos/bin/$proc $core_file -ex "bt 10" -ex quit
|
||||
exit 8
|
||||
|
@ -100,14 +118,14 @@ function runSimCaseOneByOnefq {
|
|||
cp -r ../../sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
|
||||
rm -rf ../../sim/case.log
|
||||
fi
|
||||
dohavecore $2
|
||||
dohavecore $2 1
|
||||
if [[ $2 == 1 ]];then
|
||||
exit 8
|
||||
fi
|
||||
fi
|
||||
end_time=`date +%s`
|
||||
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
|
||||
dohavecore $2
|
||||
dohavecore $2 1
|
||||
fi
|
||||
done
|
||||
rm -rf ../../../sim/case.log
|
||||
|
@ -175,7 +193,7 @@ function runPyCaseOneByOnefq() {
|
|||
echo '=====================log===================== '
|
||||
cat ../../sim/case.log
|
||||
rm -rf ../../sim/case.log
|
||||
dohavecore $2
|
||||
dohavecore $2 2
|
||||
if [[ $2 == 1 ]];then
|
||||
exit 8
|
||||
fi
|
||||
|
@ -184,7 +202,7 @@ function runPyCaseOneByOnefq() {
|
|||
else
|
||||
$line > /dev/null 2>&1
|
||||
fi
|
||||
dohavecore $2
|
||||
dohavecore $2 2
|
||||
fi
|
||||
done
|
||||
rm -rf ../../sim/case.log
|
||||
|
@ -211,15 +229,15 @@ if [ "$2" != "jdbc" ] && [ "$2" != "python" ] && [ "$2" != "unit" ]; then
|
|||
echo "### run TSIM b1 test ###"
|
||||
runSimCaseOneByOnefq b1 0
|
||||
runSimCaseOneByOnefq b4 0
|
||||
runSimCaseOneByOnefq b5 0
|
||||
runSimCaseOneByOnefq b6 0
|
||||
runSimCaseOneByOnefq b7 0
|
||||
elif [ "$1" == "b2" ]; then
|
||||
echo "### run TSIM b2 test ###"
|
||||
runSimCaseOneByOnefq b2 0
|
||||
runSimCaseOneByOnefq b5 0
|
||||
elif [ "$1" == "b3" ]; then
|
||||
echo "### run TSIM b3 test ###"
|
||||
runSimCaseOneByOnefq b3 0
|
||||
runSimCaseOneByOnefq b6 0
|
||||
elif [ "$1" == "b1fq" ]; then
|
||||
echo "### run TSIM b1 test ###"
|
||||
runSimCaseOneByOnefq b1 1
|
||||
|
|
Loading…
Reference in New Issue