diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 39d8ec3420..b15d5ad33a 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc {
bool interpQuery;
bool distinct;
bool join;
+ bool continueQuery;
} SMultiFunctionsDesc;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h
index 2793e72635..42d3307ac8 100644
--- a/include/libs/planner/plannerOp.h
+++ b/include/libs/planner/plannerOp.h
@@ -23,6 +23,7 @@
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif
+OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan)
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index 34ab1fb05a..159a92b0ab 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
+ SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
+ pQueryStmtInfo->info.continueQuery = true;
+
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
@@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto _return;
}
+ printf("%s\n", pStr);
+
// The topic should be related to a database that the queried table is belonged to.
SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0};
diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp
index 440ef0d728..736a47273f 100644
--- a/source/client/test/clientTests.cpp
+++ b/source/client/test/clientTests.cpp
@@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) {
// taosHashCleanup(phash);
//}
//
-//TEST(testCase, create_topic_Test) {
-// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
-// assert(pConn != NULL);
-//
-// TAOS_RES* pRes = taos_query(pConn, "use abc1");
-// if (taos_errno(pRes) != 0) {
-// printf("error in use db, reason:%s\n", taos_errstr(pRes));
-// }
-// taos_free_result(pRes);
-//
-// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
-// ASSERT_TRUE(pFields == nullptr);
-//
-// int32_t numOfFields = taos_num_fields(pRes);
-// ASSERT_EQ(numOfFields, 0);
-//
-// taos_free_result(pRes);
-//
-// char* sql = "select * from tu";
-// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
-// taos_free_result(pRes);
-// taos_close(pConn);
-//}
-//
+TEST(testCase, create_topic_Test) {
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+ assert(pConn != NULL);
+
+ TAOS_RES* pRes = taos_query(pConn, "use abc1");
+ if (taos_errno(pRes) != 0) {
+ printf("error in use db, reason:%s\n", taos_errstr(pRes));
+ }
+ taos_free_result(pRes);
+
+ TAOS_FIELD* pFields = taos_fetch_fields(pRes);
+ ASSERT_TRUE(pFields == nullptr);
+
+ int32_t numOfFields = taos_num_fields(pRes);
+ ASSERT_EQ(numOfFields, 0);
+
+ taos_free_result(pRes);
+
+ char* sql = "select * from tu";
+ pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
+ taos_free_result(pRes);
+ taos_close(pConn);
+}
+
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
@@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) {
// taos_close(pConn);
//}
-TEST(testCase, agg_query_tables) {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- ASSERT_NE(pConn, nullptr);
-
- TAOS_RES* pRes = taos_query(pConn, "use dbv");
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
- if (taos_errno(pRes) != 0) {
- printf("failed to create table, reason:%s\n", taos_errstr(pRes));
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "select count(*) from tu");
- if (taos_errno(pRes) != 0) {
- printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
- taos_free_result(pRes);
- ASSERT_TRUE(false);
- }
-
- TAOS_ROW pRow = NULL;
- TAOS_FIELD* pFields = taos_fetch_fields(pRes);
- int32_t numOfFields = taos_num_fields(pRes);
-
- char str[512] = {0};
- while ((pRow = taos_fetch_row(pRes)) != NULL) {
- int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
- printf("%s\n", str);
- }
-
- taos_free_result(pRes);
- taos_close(pConn);
-}
\ No newline at end of file
+//TEST(testCase, agg_query_tables) {
+// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+// ASSERT_NE(pConn, nullptr);
+//
+// TAOS_RES* pRes = taos_query(pConn, "use dbv");
+// taos_free_result(pRes);
+//
+// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
+// if (taos_errno(pRes) != 0) {
+// printf("failed to create table, reason:%s\n", taos_errstr(pRes));
+// }
+// taos_free_result(pRes);
+//
+// pRes = taos_query(pConn, "select count(*) from tu");
+// if (taos_errno(pRes) != 0) {
+// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
+// taos_free_result(pRes);
+// ASSERT_TRUE(false);
+// }
+//
+// TAOS_ROW pRow = NULL;
+// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
+// int32_t numOfFields = taos_num_fields(pRes);
+//
+// char str[512] = {0};
+// while ((pRow = taos_fetch_row(pRes)) != NULL) {
+// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
+// printf("%s\n", str);
+// }
+//
+// taos_free_result(pRes);
+// taos_close(pConn);
+//}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index ead856a06b..e5f84a8d8d 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "../../../../../include/libs/executor/executor.h"
#include "tqInt.h"
#include "tqMetaStore.h"
@@ -766,3 +767,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
* status) {*/
/*return 0;*/
/*}*/
+
+static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
+ if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
+ return NULL;
+ }
+
+ // print those info into log
+ pMsg->sId = be64toh(pMsg->sId);
+ pMsg->queryId = be64toh(pMsg->queryId);
+ pMsg->taskId = be64toh(pMsg->taskId);
+ pMsg->contentLen = ntohl(pMsg->contentLen);
+
+ struct SSubplan *plan = NULL;
+ int32_t code = qStringToSubplan(pMsg->msg, &plan);
+ if (code != TSDB_CODE_SUCCESS) {
+ terrno = code;
+ return NULL;
+ }
+
+ qTaskInfo_t pTaskInfo = NULL;
+ code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
+ if (code != TSDB_CODE_SUCCESS) {
+ // TODO: destroy SSubplan & pTaskInfo
+ terrno = code;
+ return NULL;
+ }
+
+ return pTaskInfo;
+}
\ No newline at end of file
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 2fe3392b25..73a30a62f5 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -253,12 +253,8 @@ typedef struct SExecTaskInfo {
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads
-// tsem_t ready;
-// int32_t dataReady; // denote if query result is ready or not
-// void* rspContext; // response context
char *sql; // query sql string
jmp_buf env; //
- DataSinkHandle dsHandle;
struct SOperatorInfo *pRoot;
} SExecTaskInfo;
@@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
-int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
+int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index f39df4d4ae..f5c13ef782 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult);
}
-int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
- assert(tsdb != NULL && pSubplan != NULL);
+int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
+ assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
- int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
+ int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto _error;
}
- code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
+ code = dsCreateDataSinker(pSubplan->pDataSink, handle);
- *handle = (*pTask)->dsHandle;
-
-_error:
+ _error:
// if failed to add ref for all tables in this query, abort current query
return code;
}
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index dc4d9c7238..e2beb43fa8 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -7778,22 +7778,26 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
return NULL;
}
-int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
- tsdbReadHandleT tReaderHandle = NULL;
-
- int32_t code = 0;
+int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
uint64_t queryId = pPlan->id.queryId;
- SPhyNode* pPhyNode = pPlan->pNode;
-
+ int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId);
+ if (*pTaskInfo == NULL) {
+ code = TSDB_CODE_QRY_OUT_OF_MEMORY;
+ goto _complete;
+ }
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId);
if ((*pTaskInfo)->pRoot == NULL) {
- return terrno;
+ code = TSDB_CODE_QRY_OUT_OF_MEMORY;
}
- return TSDB_CODE_SUCCESS;
+_complete:
+ tfree(*pTaskInfo);
+
+ terrno = code;
+ return code;
}
/**
diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h
index 4ff8364198..41f50607cb 100644
--- a/source/libs/planner/inc/plannerInt.h
+++ b/source/libs/planner/inc/plannerInt.h
@@ -28,19 +28,20 @@ extern "C" {
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
-#define QNODE_PROJECT 3
-#define QNODE_AGGREGATE 4
-#define QNODE_GROUPBY 5
-#define QNODE_LIMIT 6
-#define QNODE_JOIN 7
-#define QNODE_DISTINCT 8
-#define QNODE_SORT 9
-#define QNODE_UNION 10
-#define QNODE_TIMEWINDOW 11
-#define QNODE_SESSIONWINDOW 12
-#define QNODE_STATEWINDOW 13
-#define QNODE_FILL 14
-#define QNODE_MODIFY 15
+#define QNODE_STREAMSCAN 3
+#define QNODE_PROJECT 4
+#define QNODE_AGGREGATE 5
+#define QNODE_GROUPBY 6
+#define QNODE_LIMIT 7
+#define QNODE_JOIN 8
+#define QNODE_DISTINCT 9
+#define QNODE_SORT 10
+#define QNODE_UNION 11
+#define QNODE_TIMEWINDOW 12
+#define QNODE_SESSIONWINDOW 13
+#define QNODE_STATEWINDOW 14
+#define QNODE_FILL 15
+#define QNODE_MODIFY 16
typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not
diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c
index 9c9ff8fe2b..e817b764d5 100644
--- a/source/libs/planner/src/logicPlan.c
+++ b/source/libs/planner/src/logicPlan.c
@@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch(type) {
case QNODE_TAGSCAN:
+ case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: {
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
@@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
return pNode;
}
- SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
+ SQueryPlanNode* pNode = NULL;
+ if (pQueryInfo->info.continueQuery) {
+ pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info);
+ } else {
+ pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
+ }
if (!pQueryInfo->info.projectionQuery) {
SArray* p = pQueryInfo->exprList[0];
@@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
pNode->numOfExpr = num;
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
taosArrayAddAll(pNode->pExpr, p);
-// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
}
}
@@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
int32_t len = len1 + totalLen;
switch(pQueryNode->info.type) {
+ case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: {
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
@@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
assert(pQueryNode);
-
*str = calloc(1, 4096);
int32_t len = sprintf(*str, "===== logic plan =====\n");
diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c
index 1e1a97df1f..dd869d87b3 100644
--- a/source/libs/planner/src/physicalPlan.c
+++ b/source/libs/planner/src/physicalPlan.c
@@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
- return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
+ int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
+ return createUserTableScanNode(pPlanNode, pTable, type);
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
@@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode);
break;
+ case QNODE_STREAMSCAN:
case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode);
break;
diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c
index 2abb90993b..cf54fdec85 100644
--- a/source/libs/planner/src/physicalPlanJson.c
+++ b/source/libs/planner/src/physicalPlanJson.c
@@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) {
+ case OP_StreamScan:
case OP_TableScan:
case OP_DataBlocksOptScan:
case OP_TableSeqScan:
diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c
index 9b32213ad7..6b3f37741e 100644
--- a/source/libs/planner/src/planner.c
+++ b/source/libs/planner/src/planner.c
@@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
}
if (pLogicPlan->info.type != QNODE_MODIFY) {
-// char* str = NULL;
-// queryPlanToString(pLogicPlan, &str);
-// printf("%s\n", str);
+ char* str = NULL;
+ queryPlanToString(pLogicPlan, &str);
+ printf("%s\n", str);
}
code = optimizeQueryPlan(pLogicPlan);