diff --git a/docs/en/05-get-started/index.md b/docs/en/05-get-started/index.md
index 56958ef3ec..9c5a853820 100644
--- a/docs/en/05-get-started/index.md
+++ b/docs/en/05-get-started/index.md
@@ -1,6 +1,6 @@
---
title: Get Started
-description: 'Install TDengine from Docker image, apt-get or package, and run TAOS CLI and taosBenchmark to experience the features'
+description: 'Install TDengine from Docker image, apt-get or package, and run TDengine CLI and taosBenchmark to experience the features'
---
import Tabs from "@theme/Tabs";
@@ -120,7 +120,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s)
```
-Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TAOS CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/).
+Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TDengine CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/).
## Experience the blazing fast speed
diff --git a/docs/en/07-develop/02-model/index.mdx b/docs/en/07-develop/02-model/index.mdx
index 86853aaaa3..e0378cc77c 100644
--- a/docs/en/07-develop/02-model/index.mdx
+++ b/docs/en/07-develop/02-model/index.mdx
@@ -37,7 +37,7 @@ USE power;
## Create STable
-In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/tdinternal/arch#model_table1), the SQL statement below can be used to create the super table.
+In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/concept/#model_table1), the SQL statement below can be used to create the super table.
```sql
CREATE STable meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
diff --git a/docs/en/07-develop/03-insert-data/01-sql-writing.mdx b/docs/en/07-develop/03-insert-data/01-sql-writing.mdx
index 397b1a14fd..d8c4453f40 100644
--- a/docs/en/07-develop/03-insert-data/01-sql-writing.mdx
+++ b/docs/en/07-develop/03-insert-data/01-sql-writing.mdx
@@ -22,7 +22,7 @@ import CStmt from "./_c_stmt.mdx";
## Introduction
-Application programs can execute `INSERT` statement through connectors to insert rows. The TAOS CLI can also be used to manually insert data.
+Application programs can execute `INSERT` statement through connectors to insert rows. The TDengine CLI can also be used to manually insert data.
### Insert Single Row
diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md
index 878d7f0202..818027174e 100644
--- a/docs/zh/05-get-started/index.md
+++ b/docs/zh/05-get-started/index.md
@@ -1,6 +1,6 @@
---
title: 立即开始
-description: '从 Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TAOS CLI和工具 taosdemo 快速体验 TDengine 功能'
+description: '从 Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TDengine CLI和工具 taosdemo 快速体验 TDengine 功能'
---
import Tabs from "@theme/Tabs";
@@ -122,7 +122,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s)
```
-除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TAOS CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
+除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 3f05c27453..fdedb947d7 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -95,7 +95,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
- qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
+ qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
/**
*
@@ -159,13 +159,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
-/**
- * Update the table id list of a given query.
- * @param uid child table uid
- * @param type operation type: ADD|DROP
- * @return
- */
-int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index b359fa5d6a..54762a0f17 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx {
} SqlFunctionCtx;
enum {
- TEXPR_NODE_DUMMY = 0x0,
TEXPR_BINARYEXPR_NODE= 0x1,
TEXPR_UNARYEXPR_NODE = 0x2,
- TEXPR_FUNCTION_NODE = 0x3,
- TEXPR_COL_NODE = 0x4,
- TEXPR_VALUE_NODE = 0x8,
};
typedef struct tExprNode {
diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h
index 3c25d8add4..3620f45408 100644
--- a/include/libs/nodes/nodes.h
+++ b/include/libs/nodes/nodes.h
@@ -65,6 +65,12 @@ extern "C" {
(list) = NULL; \
} while (0)
+#define NODES_CLEAR_LIST(list) \
+ do { \
+ nodesClearList((list)); \
+ (list) = NULL; \
+ } while (0)
+
typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on.
diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h
index 26d055d7d2..749d58b224 100644
--- a/include/libs/nodes/plannodes.h
+++ b/include/libs/nodes/plannodes.h
@@ -69,6 +69,7 @@ typedef struct SScanLogicNode {
int16_t tsColId;
double filesFactor;
SArray* pSmaIndexes;
+ SNodeList* pPartTags;
} SScanLogicNode;
typedef struct SJoinLogicNode {
@@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode {
double ratio;
int32_t dataRequired;
SNodeList* pDynamicScanFuncs;
- SNodeList* pPartitionKeys;
+ SNodeList* pPartitionTags;
int64_t interval;
int64_t offset;
int64_t sliding;
diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp
index dc15c0b13d..b6ec75f8eb 100644
--- a/source/client/test/clientTests.cpp
+++ b/source/client/test/clientTests.cpp
@@ -44,6 +44,7 @@ void showDB(TAOS* pConn) {
}
void fetchCallback(void* param, void* res, int32_t numOfRow) {
+#if 0
printf("numOfRow = %d \n", numOfRow);
int numFields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
@@ -63,6 +64,13 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) {
// taos_close(_taos);
// taos_cleanup();
}
+#endif
+ if (numOfRow == 0) {
+ printf("completed\n");
+ return;
+ }
+
+ taos_fetch_raw_block_a(res, fetchCallback, param);
}
void queryCallback(void* param, void* res, int32_t code) {
@@ -70,7 +78,7 @@ void queryCallback(void* param, void* res, int32_t code) {
printf("failed to execute, reason:%s\n", taos_errstr(res));
}
printf("start to fetch data\n");
- taos_fetch_rows_a(res, fetchCallback, param);
+ taos_fetch_raw_block_a(res, fetchCallback, param);
}
void queryCallback1(void* param, void* res, int32_t code) {
@@ -778,9 +786,9 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
- taos_query(pConn, "use nest");
-
- TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
+ taos_query(pConn, "use table_alltype_hyperloglog");
+#if 0
+ TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
}
@@ -802,8 +810,9 @@ TEST(testCase, async_api_test) {
printf("%s\n", str);
memset(str, 0, sizeof(str));
}
+#endif
- taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn);
+ taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
getchar();
taos_close(pConn);
}
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 5a2aaed74e..6a4ddacf60 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
+ colInfo.hasNull = true;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c
index 53cbb9b669..6d37143fc9 100644
--- a/source/dnode/mnode/impl/src/mndDnode.c
+++ b/source/dnode/mnode/impl/src/mndDnode.c
@@ -370,6 +370,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
if (pObj->state != statusReq.mload.syncState) {
+ mInfo("dnode:%d, mnode syncstate from %s to %s", pObj->id, syncStr(pObj->state), syncStr(statusReq.mload.syncState));
pObj->state = statusReq.mload.syncState;
pObj->stateStartTime = taosGetTimestampMs();
}
diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c
index 00118eac1e..e141e7dfb3 100644
--- a/source/dnode/mnode/impl/src/mndMain.c
+++ b/source/dnode/mnode/impl/src/mndMain.c
@@ -699,6 +699,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
+ mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
return 0;
}
diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c
index 7ec490d52f..e82c479cdd 100644
--- a/source/dnode/mnode/impl/src/mndMnode.c
+++ b/source/dnode/mnode/impl/src/mndMnode.c
@@ -667,6 +667,10 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
}
if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
roles = syncStr(pObj->state);
+ if (pObj->state == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
+ roles = syncStr(TAOS_SYNC_STATE_ERROR);
+ mError("mnode:%d, is leader too", pObj->id);
+ }
}
char b2[12 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 4be40e6a5c..dafb2fa1d1 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -873,7 +873,8 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->rawWritten = 0;
pAction->msgSent = 0;
pAction->msgReceived = 0;
- if (pAction->errCode == TSDB_CODE_RPC_REDIRECT) {
+ if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NOT_IN_NEW_CONFIG ||
+ pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
action, pAction->epSet.inUse);
@@ -1126,7 +1127,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
pTrans->code = terrno;
mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id,
mndTransStr(pAction->stage), pAction->id, terrstr());
- break;
}
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
@@ -1134,8 +1134,6 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} else {
terrno = code;
pTrans->code = code;
- mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
- terrstr());
break;
}
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index 975dff9fb6..25661d2e69 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -72,7 +72,17 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
return -1;
}
- return syncPropose(pVnode->sync, &rpcMsg, false);
+ int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
+ if (code != 0) {
+ vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
+ if (syncLeaderTransfer(pVnode->sync) != 0) {
+ vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
+ } else {
+ vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode));
+ }
+ }
+
+ return code;
}
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h
index ca25b7aab1..a70cff5552 100644
--- a/source/libs/executor/inc/executil.h
+++ b/source/libs/executor/inc/executil.h
@@ -55,7 +55,6 @@ typedef struct SResultRow {
uint32_t numOfRows; // number of rows of current time window
STimeWindow win;
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
-// char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowPosition {
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index cc690a3e5e..b435446513 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -189,7 +189,7 @@ typedef struct SExecTaskInfo {
} schemaVer;
STableListInfo tableqinfoList; // this is a table list
- char* sql; // query sql string
+ const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot;
@@ -544,6 +544,13 @@ typedef struct SFillOperatorInfo {
bool multigroupResult;
} SFillOperatorInfo;
+typedef struct SScalarSupp {
+ SExprInfo* pScalarExprInfo;
+ int32_t numOfScalarExpr; // the number of scalar expression in group operator
+ SqlFunctionCtx* pScalarFuncCtx;
+ int32_t* rowCellInfoOffset; // offset value for each row result cell info
+} SScalarSupp;
+
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
@@ -556,10 +563,7 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
- SExprInfo* pScalarExprInfo;
- int32_t numOfScalarExpr; // the number of scalar expression in group operator
- SqlFunctionCtx* pScalarFuncCtx;
- int32_t* rowCellInfoOffset; // offset value for each row result cell info
+ SScalarSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
@@ -583,6 +587,7 @@ typedef struct SPartitionOperatorInfo {
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes;
+ SScalarSupp scalarSupp;
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
@@ -760,6 +765,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
+SArray* extractPartitionColInfo(SNodeList* pNodeList);
+
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
@@ -839,20 +846,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
-SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
- SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
-SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
- SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
- int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
+ SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
-SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream,
- SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
@@ -890,7 +894,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
- EOPTR_EXEC_MODEL model);
+ const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h
index fd3581e2bf..e731c55a7d 100644
--- a/source/libs/executor/inc/tsort.h
+++ b/source/libs/executor/inc/tsort.h
@@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
*/
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
+/**
+ * get proper sort buffer pages according to the row size
+ * @param rowSize
+ * @return
+ */
+int32_t getProperSortPageSize(size_t rowSize);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 6ba1cf859e..01ed30c189 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -28,17 +28,6 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
-int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
- int32_t size = 0;
-
- for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
-// size += pQueryAttr->pExpr1[i].base.interBytes;
- }
-
- assert(size >= 0);
- return size;
-}
-
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->size = 0;
pResultRowInfo->cur.pageId = -1;
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index bebfc75f17..b1d076e8f5 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -121,7 +121,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t pTaskInfo = NULL;
- code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM);
+ code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index b1125c8e80..00158d7024 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -31,13 +31,13 @@ static void initRefPool() {
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
- qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
+ qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
- int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
+ int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 15191bf435..3a59bfcaae 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include "tref.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -21,6 +20,7 @@
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
+#include "tref.h"
#include "tdatablock.h"
#include "tglobal.h"
@@ -40,9 +40,6 @@
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
-#define SDATA_BLOCK_INITIALIZER \
- (SDataBlockInfo) { {0}, 0 }
-
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#if 0
@@ -79,7 +76,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define realloc u_realloc
#endif
-#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
+#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
@@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
-static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
-
static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
@@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
-static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey, bool ascQuery,
- bool timeWindowInterpo) {
- int64_t skey = TSKEY_INITIAL_VAL;
-#if 0
- int32_t i = 0;
- for (i = pResultRowInfo->size - 1; i >= 0; --i) {
- SResultRow* pResult = pResultRowInfo->pResult[i];
- if (pResult->closed) {
- break;
- }
-
- // new closed result rows
- if (timeWindowInterpo) {
- if (pResult->endInterp &&
- ((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
- if (i > 0) { // the first time window, the startInterp is false.
- assert(pResult->startInterp);
- }
-
- closeResultRow(pResultRowInfo, i);
- } else {
- skey = pResult->win.skey;
- }
- } else {
- if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
- closeResultRow(pResultRowInfo, i);
- } else {
- skey = pResult->win.skey;
- }
- }
- }
-
- // all result rows are closed, set the last one to be the skey
- if (skey == TSKEY_INITIAL_VAL) {
- if (pResultRowInfo->size == 0) {
- // assert(pResultRowInfo->current == NULL);
- assert(pResultRowInfo->curPos == -1);
- pResultRowInfo->curPos = -1;
- } else {
- pResultRowInfo->curPos = pResultRowInfo->size - 1;
- }
- } else {
- for (i = pResultRowInfo->size - 1; i >= 0; --i) {
- SResultRow* pResult = pResultRowInfo->pResult[i];
- if (pResult->closed) {
- break;
- }
- }
-
- if (i == pResultRowInfo->size - 1) {
- pResultRowInfo->curPos = i;
- } else {
- pResultRowInfo->curPos = i + 1; // current not closed result object
- }
- }
-#endif
-}
-//
-// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
-// bool ascQuery, bool interp) {
-// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
-// closeAllResultRows(pResultRowInfo);
-// pResultRowInfo->curPos = pResultRowInfo->size - 1;
-// } else {
-// int32_t step = ascQuery ? 1 : -1;
-// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
-// }
-//}
-
// query_range_start, query_range_end, window_duration, window_start, window_end
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
@@ -886,7 +812,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
- setResultRowKey(pResultRow, pData, type);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
@@ -908,21 +833,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
- // if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
- // // return QUERY_IS_ASC_QUERY(pQueryAttr);
- // }
- //
- // // denote the order type
- // if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
- // // return pCtx->param[0].i == pQueryAttr->order.order;
- // }
-
- // in the reverse table scan, only the following functions need to be executed
- // if (IS_REVERSE_SCAN(pRuntimeEnv) ||
- // (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
- // return false;
- // }
-
return true;
}
@@ -2395,7 +2305,7 @@ typedef struct SFetchRspHandleWrapper {
} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
- SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param;
+ SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) {
@@ -2403,7 +2313,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
return TSDB_CODE_SUCCESS;
}
- int32_t index = pWrapper->sourceIndex;
+ int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (code == TSDB_CODE_SUCCESS) {
@@ -2411,9 +2321,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
- pRsp->compLen = htonl(pRsp->compLen);
+ pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
- pRsp->useconds = htobe64(pRsp->useconds);
+ pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
@@ -2475,9 +2385,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
- pMsg->sId = htobe64(pSource->schedId);
- pMsg->taskId = htobe64(pSource->taskId);
- pMsg->queryId = htobe64(pTaskInfo->id.queryId);
+ pMsg->sId = htobe64(pSource->schedId);
+ pMsg->taskId = htobe64(pSource->taskId);
+ pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@@ -2489,14 +2399,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
}
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
- pWrapper->exchangeId = pExchangeInfo->self;
+ pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
- pMsgSendInfo->fp = loadRemoteDataCallback;
+ pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
@@ -2645,7 +2555,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
- ", completed:%d try next %d/%"PRIzu,
+ ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
@@ -2663,8 +2573,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if (pRsp->completed == 1) {
- qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64
- ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
+ qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
+ " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
+ ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1;
@@ -2718,7 +2629,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
- totalSources, (endTs - startTs)/1000.0);
+ totalSources, (endTs - startTs) / 1000.0);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
@@ -2849,8 +2760,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = id;
- dataInfo.index = i;
- SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
+ dataInfo.index = i;
+ SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
@@ -2864,7 +2775,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
if (numOfSources == 0) {
- qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources);
+ qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
return TSDB_CODE_INVALID_PARA;
}
@@ -2898,16 +2809,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0);
- pInfo->seqLoadData = false;
- pInfo->pTransporter = pTransporter;
- pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
- pOperator->name = "ExchangeOperator";
+ pInfo->seqLoadData = false;
+ pInfo->pTransporter = pTransporter;
+ pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
+ pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
@@ -3934,27 +3845,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
}
}
-// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
-// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
-// if (size == 0) {
-// return NULL;
-// }
-//
-// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
-// if (pTableQueryInfo == NULL) {
-// return NULL;
-// }
-//
-// for (int32_t j = 0; j < size; ++j) {
-// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
-// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
-// pTQueryInfo->lastKey = pk->lastKey;
-// }
-//
-// pTableQueryInfo->lastKey = 0;
-// return pTableQueryInfo;
-//}
-
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
@@ -4066,7 +3956,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
}
void doDestroyExchangeOperatorInfo(void* param) {
- SExchangeInfo* pExInfo = (SExchangeInfo*) param;
+ SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
@@ -4524,7 +4414,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
-static SArray* extractPartitionColInfo(SNodeList* pNodeList);
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
@@ -4554,8 +4443,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS;
}
-int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){
- if(groupKey == NULL) {
+int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) {
+ if (groupKey == NULL) {
return TDB_CODE_SUCCESS;
}
@@ -4564,11 +4453,11 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t keyLen = 0;
- void *keyBuf = NULL;
+ void* keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
- keyLen += pCol->bytes; // actual data + null_flag
+ keyLen += pCol->bytes; // actual data + null_flag
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
@@ -4579,9 +4468,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY;
}
- for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){
- STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i);
- SMetaReader mr = {0};
+ for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
+ STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
+ SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid);
@@ -4590,23 +4479,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
- if(strcmp(pCol->name, "tbname") == 0){
+ if (strcmp(pCol->name, "tbname") == 0) {
isNull[i] = 0;
memcpy(pStart, mr.me.name, strlen(mr.me.name));
pStart += strlen(mr.me.name);
- }else{
+ } else {
STagVal tagVal = {0};
tagVal.cid = pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
- if(p == NULL){
+ if (p == NULL) {
isNull[j] = 1;
continue;
}
isNull[i] = 0;
if (pCol->type == TSDB_DATA_TYPE_JSON) {
-// int32_t dataLen = getJsonValueLen(pkey->pData);
-// memcpy(pStart, (pkey->pData), dataLen);
-// pStart += dataLen;
+ // int32_t dataLen = getJsonValueLen(pkey->pData);
+ // memcpy(pStart, (pkey->pData), dataLen);
+ // pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
memcpy(pStart, tagVal.pData, tagVal.nData);
pStart += tagVal.nData;
@@ -4618,7 +4507,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
- int32_t len = (int32_t) (pStart - (char*)keyBuf);
+ int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
if (groupId) {
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
@@ -4653,7 +4542,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
- SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
+ SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
@@ -4661,7 +4550,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
- SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
+ SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
@@ -4671,11 +4560,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
- SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
- generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
+ SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
+ generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
- SOperatorInfo* pOperator =
- createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
+ SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
@@ -4700,16 +4588,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
- SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
- int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
+ SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
+ int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
- if (code){
+ if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
- SOperatorInfo* pOperator =
- createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
+ SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
@@ -4718,7 +4605,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
- int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions);
+ int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
+ pScanPhyNode->node.pConditions);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
@@ -4796,7 +4684,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
- SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
+ SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
@@ -4871,12 +4759,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
- SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
- SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
- SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
-
- SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
- pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
+ pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
@@ -5034,7 +4917,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
}
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
- if(!pNodeList) return NULL;
+ if (!pNodeList) return NULL;
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
@@ -5139,7 +5022,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res);
- if (code == TSDB_CODE_INDEX_REBUILDING){ // todo
+ if (code == TSDB_CODE_INDEX_REBUILDING) { // todo
// doFilter();
} else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
@@ -5314,7 +5197,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
- EOPTR_EXEC_MODEL model) {
+ const char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
@@ -5324,6 +5207,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
+ (*pTaskInfo)->sql = sql;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
@@ -5499,4 +5383,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
}
return code;
}
-
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 688c576d03..23226a0134 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
- if (pInfo->pScalarExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
+ if (pInfo->scalarSup.pScalarExprInfo != NULL) {
+ pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
@@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
- pInfo->pScalarExprInfo = pScalarExprInfo;
- pInfo->numOfScalarExpr = numOfScalarExpr;
- pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
+ pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
+ pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
+ pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
@@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return NULL;
}
- SGroupbyOperatorInfo* pInfo = pOperator->info;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+
+ SPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
@@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break;
}
+ // there is an scalar expression that needs to be calculated right before apply the group aggregation.
+ if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
+ pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
+ if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, pTaskInfo->code);
+ }
+ }
+
doHashPartition(pOperator, pBlock);
}
@@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pInfo->columnOffset);
}
-SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
- SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) {
+SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
- pInfo->pGroupCols = pGroupColList;
+ SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
+
+ int32_t numOfCols = 0;
+ SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
+
+ pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
+
+ if (pPartNode->pExprs != NULL) {
+ pInfo->scalarSupp.numOfScalarExpr = 0;
+ pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
+ pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
+ pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
+ }
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
@@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
- getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
+ getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
- pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
- pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
- code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
+ pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
+ pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
+ code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
- pInfo->binfo.pRes = pResultBlock;
+ pInfo->binfo.pRes = pResBlock;
pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->info = pInfo;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 0823594858..1ffb1529d7 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1234,6 +1234,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
+ doFilter(pInfo->pCondition, pInfo->pRes);
+#if 0
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
@@ -1279,6 +1281,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
px->info.rows = numOfRow;
pInfo->pRes = px;
+#endif
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
@@ -1457,6 +1460,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
doFilterResult(pInfo);
+ blockDataDestroy(p);
+
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
@@ -1545,10 +1550,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
- // blockDataDestroy(p); todo handle memory leak
-
pInfo->pRes->info.rows = p->info.rows;
- return p->info.rows;
+ blockDataDestroy(p);
+
+ return pInfo->pRes->info.rows;
}
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c
index 1008a5263c..2f2080d5fe 100644
--- a/source/libs/executor/src/sortoperator.c
+++ b/source/libs/executor/src/sortoperator.c
@@ -420,24 +420,29 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
goto _error;
}
- pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
- pInfo->pSortInfo = pSortInfo;
+ pInfo->binfo.pRes = pResBlock;
+ pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo;
- pInfo->pInputBlock = pInputBlock;
- pOperator->name = "MultiwaySortMerge";
+ pInfo->pInputBlock = pInputBlock;
+ pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
-
- pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
- pInfo->sortBufSize = pInfo->bufPageSize * 16;
- pInfo->hasGroupId = false;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
+ pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->pTaskInfo = pTaskInfo;
+
+ pInfo->bufPageSize = getProperSortPageSize(rowSize);
+
+ uint32_t numOfSources = taosArrayGetSize(pSortInfo);
+ numOfSources = TMAX(2, numOfSources);
+
+ pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
+
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 0fcf52b1a4..f8972a02a1 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
continue;
}
- // if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
- // pCtx[k].start.key = INT64_MIN;
- // continue;
- // }
-
SFunctParam* pParam = &pCtx[k].param[0];
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c
index 1473bd81bb..5bcd58f8db 100644
--- a/source/libs/executor/src/tsort.c
+++ b/source/libs/executor/src/tsort.c
@@ -204,7 +204,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
- SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
+
+ if (taosArrayGetSize(pSource->pageIdList) == 0) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage);
@@ -532,6 +537,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0;
}
+int32_t getProperSortPageSize(size_t rowSize) {
+ uint32_t defaultPageSize = 4096;
+
+ uint32_t pgSize = 0;
+ if (rowSize * 4 > defaultPageSize) {
+ pgSize = rowSize * 4;
+ } else {
+ pgSize = defaultPageSize;
+ }
+
+ return pgSize;
+}
+
static int32_t createInitialSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
@@ -557,14 +575,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (!hasGroupId) {
// calculate the buffer pages according to the total available buffers.
- int32_t rowSize = blockDataGetRowSize(pBlock);
- if (rowSize * 4 > 4096) {
- pHandle->pageSize = rowSize * 4;
- } else {
- pHandle->pageSize = 4096;
- }
-
- // todo!!
+ pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
+
+ // todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
@@ -577,7 +590,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
- // todo relocate the columns
+
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) {
return code;
diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp
index 880ac6c78e..dba2fb897a 100644
--- a/source/libs/executor/test/executorTests.cpp
+++ b/source/libs/executor/test/executorTests.cpp
@@ -945,7 +945,7 @@ TEST(testCase, build_executor_tree_Test) {
int32_t code = qStringToSubplan(msg, &plan);
ASSERT_EQ(code, 0);
- code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
+ code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
ASSERT_EQ(code, 0);
}
#if 0
diff --git a/source/libs/function/inc/texpr.h b/source/libs/function/inc/texpr.h
deleted file mode 100644
index 68aedcb5a2..0000000000
--- a/source/libs/function/inc/texpr.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_COMMON_EXPR_H_
-#define _TD_COMMON_EXPR_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include "os.h"
-
-#include "tmsg.h"
-#include "taosdef.h"
-#include "tskiplist.h"
-#include "function.h"
-
-struct tExprNode;
-struct SSchema;
-
-#define QUERY_COND_REL_PREFIX_IN "IN|"
-#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
-#define QUERY_COND_REL_PREFIX_MATCH "MATCH|"
-#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|"
-
-#define QUERY_COND_REL_PREFIX_IN_LEN 3
-#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
-#define QUERY_COND_REL_PREFIX_MATCH_LEN 6
-#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7
-
-typedef bool (*__result_filter_fn_t)(const void *, void *);
-typedef void (*__do_filter_suppl_fn_t)(void *, void *);
-
-/**
- * this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
- */
-typedef struct tQueryInfo {
- uint8_t optr; // expression operator
- SSchema sch; // schema of tags
- char* q;
- __compar_fn_t compare; // filter function
- bool indexed; // indexed columns
-} tQueryInfo;
-
-typedef struct SExprTraverseSupp {
- __result_filter_fn_t nodeFilterFn;
- __do_filter_suppl_fn_t setupInfoFn;
- void *pExtInfo;
-} SExprTraverseSupp;
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_COMMON_EXPR_H_*/
diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c
index f04b4f17f9..039e0a9dfc 100644
--- a/source/libs/function/src/texpr.c
+++ b/source/libs/function/src/texpr.c
@@ -17,15 +17,7 @@
#include "os.h"
#include "texception.h"
-#include "taosdef.h"
#include "tmsg.h"
-#include "tarray.h"
-#include "tbuffer.h"
-#include "tcompare.h"
-#include "thash.h"
-#include "texpr.h"
-#include "tvariant.h"
-#include "tdef.h"
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c
index 1ef0ccf7f9..2e30b01357 100644
--- a/source/libs/nodes/src/nodesCloneFuncs.c
+++ b/source/libs/nodes/src/nodesCloneFuncs.c
@@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
+ CLONE_NODE_LIST_FIELD(pPartTags);
return (SNode*)pDst;
}
@@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys
COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
- CLONE_NODE_LIST_FIELD(pPartitionKeys);
+ CLONE_NODE_LIST_FIELD(pPartitionTags);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index 968bb75997..fb6a428d3c 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTagCond = "TagCond";
+static const char* jkScanLogicPlanPartTags = "PartTags";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
+ }
return code;
}
@@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
+ }
return code;
}
@@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
+static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
+ }
return code;
}
@@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
- ;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
@@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
+ }
+
return code;
}
diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c
index 41b80eaaa8..9d7cd0cf27 100644
--- a/source/libs/planner/src/planOptimizer.c
+++ b/source/libs/planner/src/planOptimizer.c
@@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
- (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) &&
- pNode->pParent->pParent &&
- QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) {
+ (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
+ QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) {
return true;
}
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
@@ -222,9 +221,8 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
static void setScanWindowInfo(SScanLogicNode* pScan) {
SLogicNode* pParent = pScan->node.pParent;
- if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) &&
- pParent->pParent &&
- QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
+ if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent &&
+ QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
pParent = pParent->pParent;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) {
@@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
}
+static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
+ if (QUERY_NODE_COLUMN == nodeType(pNode)) {
+ if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) {
+ *(bool*)pContext = true;
+ return DEAL_RES_END;
+ }
+ }
+ return DEAL_RES_CONTINUE;
+}
+
+static bool partTagsOptHasCol(SNodeList* pPartKeys) {
+ bool hasCol = false;
+ nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol);
+ return hasCol;
+}
+
+static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
+ if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
+ QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
+ return false;
+ }
+
+ return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
+}
+
+static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
+ SPartitionLogicNode* pPart =
+ (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
+ if (NULL == pPart) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
+ TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
+ int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
+ if (TSDB_CODE_SUCCESS == code) {
+ NODES_CLEAR_LIST(pPart->node.pChildren);
+ nodesDestroyNode((SNode*)pPart);
+ }
+ return code;
+}
+
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
- {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
+ {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
- {.pName = "SmaIndex", .optimizeFunc = smaOptimize}
+ {.pName = "SmaIndex", .optimizeFunc = smaOptimize},
+ {.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
};
// clang-format on
diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c
index 08e693da71..2974b3ef8c 100644
--- a/source/libs/planner/src/planPhysiCreater.c
+++ b/source/libs/planner/src/planPhysiCreater.c
@@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
- if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
+ pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
+ if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
+ (NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
nodesDestroyNode((SNode*)pTableScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h
index 29861d87ac..ecff861f50 100644
--- a/source/libs/qworker/inc/qwMsg.h
+++ b/source/libs/qworker/inc/qwMsg.h
@@ -24,7 +24,7 @@ extern "C" {
#include "dataSinkMgt.h"
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
-int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
+int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c
index 848a0420ca..5635ec8fc6 100644
--- a/source/libs/qworker/src/qwMsg.c
+++ b/source/libs/qworker/src/qwMsg.c
@@ -308,10 +308,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
- taosMemoryFreeClear(sql);
-
- QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
+ QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c
index 451607e7d0..57ebb89ed2 100644
--- a/source/libs/qworker/src/qworker.c
+++ b/source/libs/qworker/src/qworker.c
@@ -510,7 +510,7 @@ _return:
}
-int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
+int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
@@ -537,7 +537,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
ctx->plan = plan;
- code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
+ code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
@@ -938,7 +938,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
ctx.plan = plan;
- code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
+ code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c
index 3e3f3639d7..31536f413d 100644
--- a/source/libs/scheduler/src/schJob.c
+++ b/source/libs/scheduler/src/schJob.c
@@ -1488,13 +1488,15 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bo
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
*job = pJob->refId;
+ if (!sync) {
+ pJob->userCb = SCH_EXEC_CB;
+ }
+
SCH_ERR_JRET(schLaunchJob(pJob));
if (sync) {
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
- } else {
- pJob->userCb = SCH_EXEC_CB;
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
diff --git a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
index 73be4a35bf..3532332174 100644
--- a/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
+++ b/tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
@@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
+system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
+system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
+system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
+system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
+system sh/cfg.sh -n dnode5 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
@@ -128,9 +133,9 @@ if $rows != 1 then
endi
if $data(2)[4] == leader then
$leaderExist = 1
- $leaderVnode = 4
- $follower1 = 2
- $follower2 = 3
+ $leaderVnode = 2
+ $follower1 = 3
+ $follower2 = 4
endi
if $data(2)[6] == leader then
$leaderExist = 1
@@ -140,9 +145,9 @@ if $data(2)[6] == leader then
endi
if $data(2)[8] == leader then
$leaderExist = 1
- $leaderVnode = 2
- $follower1 = 3
- $follower2 = 4
+ $leaderVnode = 4
+ $follower1 = 2
+ $follower2 = 3
endi
if $leaderExist != 1 then
goto step3
@@ -171,8 +176,6 @@ if $rows != 1 then
return -1
endi
-return
-
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py
index a10eaf1fb5..e44f327995 100644
--- a/tests/system-test/7-tmq/basic5.py
+++ b/tests/system-test/7-tmq/basic5.py
@@ -250,9 +250,26 @@ class TDTestCase:
# wait db ready
while 1:
tdSql.query("show databases")
- if tdSql.getRows() == 4:
- print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
- break
+ if tdSql.getRows() == 4:
+ print ('==================================================')
+ print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0))
+ index = 0
+ if tdSql.getData(0,0) == parameterDict['dbName']:
+ index = 0
+ elif tdSql.getData(1,0) == parameterDict['dbName']:
+ index = 1
+ elif tdSql.getData(2,0) == parameterDict['dbName']:
+ index = 2
+ elif tdSql.getData(3,0) == parameterDict['dbName']:
+ index = 3
+ else:
+ continue
+
+ if tdSql.getData(index,19) == 'ready':
+ print("******************** index: %d"%index)
+ break
+
+ continue
else:
time.sleep(1)
@@ -378,8 +395,27 @@ class TDTestCase:
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 5:
- print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
- break
+ print ('==================================================')
+ print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),tdSql.getData(3,0),tdSql.getData(4,0))
+ index = 0
+ if tdSql.getData(0,0) == parameterDict['dbName']:
+ index = 0
+ elif tdSql.getData(1,0) == parameterDict['dbName']:
+ index = 1
+ elif tdSql.getData(2,0) == parameterDict['dbName']:
+ index = 2
+ elif tdSql.getData(3,0) == parameterDict['dbName']:
+ index = 3
+ elif tdSql.getData(4,0) == parameterDict['dbName']:
+ index = 4
+ else:
+ continue
+
+ if tdSql.getData(index,19) == 'ready':
+ print("******************** index: %d"%index)
+ break
+
+ continue
else:
time.sleep(1)
diff --git a/tools/taos-tools b/tools/taos-tools
index 3d5aa76f8c..0a81480420 160000
--- a/tools/taos-tools
+++ b/tools/taos-tools
@@ -1 +1 @@
-Subproject commit 3d5aa76f8c718dcffa100b45e4cbf313d499c356
+Subproject commit 0a81480420d6601bbdb57770ee64e40f24c4ea83