[td-11818] refactor.
This commit is contained in:
parent
41fb517f0b
commit
09ba6a611d
|
@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc {
|
|||
bool interpQuery;
|
||||
bool distinct;
|
||||
bool join;
|
||||
bool continueQuery;
|
||||
} SMultiFunctionsDesc;
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
|
||||
#endif
|
||||
|
||||
OP_ENUM_MACRO(StreamScan)
|
||||
OP_ENUM_MACRO(TableScan)
|
||||
OP_ENUM_MACRO(DataBlocksOptScan)
|
||||
OP_ENUM_MACRO(TableSeqScan)
|
||||
|
|
|
@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
|
|||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
|
||||
|
||||
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
|
||||
pQueryStmtInfo->info.continueQuery = true;
|
||||
|
||||
// todo check for invalid sql statement and return with error code
|
||||
|
||||
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
|
||||
|
@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
|
|||
goto _return;
|
||||
}
|
||||
|
||||
printf("%s\n", pStr);
|
||||
|
||||
// The topic should be related to a database that the queried table is belonged to.
|
||||
SName name = {0};
|
||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||
|
|
|
@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) {
|
|||
// taosHashCleanup(phash);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, create_topic_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// ASSERT_TRUE(pFields == nullptr);
|
||||
//
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
// ASSERT_EQ(numOfFields, 0);
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// char* sql = "select * from tu";
|
||||
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
TEST(testCase, create_topic_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == nullptr);
|
||||
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
ASSERT_EQ(numOfFields, 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
char* sql = "select * from tu";
|
||||
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, insert_test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// ASSERT_NE(pConn, nullptr);
|
||||
|
@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) {
|
|||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TEST(testCase, agg_query_tables) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use dbv");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "select count(*) from tu");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
TAOS_ROW pRow = NULL;
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
//TEST(testCase, agg_query_tables) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// ASSERT_NE(pConn, nullptr);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use dbv");
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to create table, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// pRes = taos_query(pConn, "select count(*) from tu");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
// taos_free_result(pRes);
|
||||
// ASSERT_TRUE(false);
|
||||
// }
|
||||
//
|
||||
// TAOS_ROW pRow = NULL;
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "../../../../../include/libs/executor/executor.h"
|
||||
#include "tqInt.h"
|
||||
#include "tqMetaStore.h"
|
||||
|
||||
|
@ -766,3 +767,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
* status) {*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
|
||||
static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
|
||||
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// print those info into log
|
||||
pMsg->sId = be64toh(pMsg->sId);
|
||||
pMsg->queryId = be64toh(pMsg->queryId);
|
||||
pMsg->taskId = be64toh(pMsg->taskId);
|
||||
pMsg->contentLen = ntohl(pMsg->contentLen);
|
||||
|
||||
struct SSubplan *plan = NULL;
|
||||
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: destroy SSubplan & pTaskInfo
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pTaskInfo;
|
||||
}
|
|
@ -253,12 +253,8 @@ typedef struct SExecTaskInfo {
|
|||
|
||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||
pthread_mutex_t lock; // used to synchronize the rsp/query threads
|
||||
// tsem_t ready;
|
||||
// int32_t dataReady; // denote if query result is ready or not
|
||||
// void* rspContext; // response context
|
||||
char *sql; // query sql string
|
||||
jmp_buf env; //
|
||||
DataSinkHandle dsHandle;
|
||||
struct SOperatorInfo *pRoot;
|
||||
} SExecTaskInfo;
|
||||
|
||||
|
@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec();
|
|||
|
||||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
|
||||
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
|
||||
|
||||
#endif // TDENGINE_EXECUTORIMPL_H
|
||||
|
|
|
@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
|
|||
tfree(param->prevResult);
|
||||
}
|
||||
|
||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||
assert(tsdb != NULL && pSubplan != NULL);
|
||||
int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||
assert(readHandle != NULL && pSubplan != NULL);
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
|
|||
goto _error;
|
||||
}
|
||||
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
||||
|
||||
*handle = (*pTask)->dsHandle;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -7778,22 +7778,26 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
|
||||
tsdbReadHandleT tReaderHandle = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
|
||||
uint64_t queryId = pPlan->id.queryId;
|
||||
|
||||
SPhyNode* pPhyNode = pPlan->pNode;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pTaskInfo = createExecTaskInfo(queryId);
|
||||
if (*pTaskInfo == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _complete;
|
||||
}
|
||||
|
||||
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId);
|
||||
if ((*pTaskInfo)->pRoot == NULL) {
|
||||
return terrno;
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_complete:
|
||||
tfree(*pTaskInfo);
|
||||
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,19 +28,20 @@ extern "C" {
|
|||
|
||||
#define QNODE_TAGSCAN 1
|
||||
#define QNODE_TABLESCAN 2
|
||||
#define QNODE_PROJECT 3
|
||||
#define QNODE_AGGREGATE 4
|
||||
#define QNODE_GROUPBY 5
|
||||
#define QNODE_LIMIT 6
|
||||
#define QNODE_JOIN 7
|
||||
#define QNODE_DISTINCT 8
|
||||
#define QNODE_SORT 9
|
||||
#define QNODE_UNION 10
|
||||
#define QNODE_TIMEWINDOW 11
|
||||
#define QNODE_SESSIONWINDOW 12
|
||||
#define QNODE_STATEWINDOW 13
|
||||
#define QNODE_FILL 14
|
||||
#define QNODE_MODIFY 15
|
||||
#define QNODE_STREAMSCAN 3
|
||||
#define QNODE_PROJECT 4
|
||||
#define QNODE_AGGREGATE 5
|
||||
#define QNODE_GROUPBY 6
|
||||
#define QNODE_LIMIT 7
|
||||
#define QNODE_JOIN 8
|
||||
#define QNODE_DISTINCT 9
|
||||
#define QNODE_SORT 10
|
||||
#define QNODE_UNION 11
|
||||
#define QNODE_TIMEWINDOW 12
|
||||
#define QNODE_SESSIONWINDOW 13
|
||||
#define QNODE_STATEWINDOW 14
|
||||
#define QNODE_FILL 15
|
||||
#define QNODE_MODIFY 16
|
||||
|
||||
typedef struct SQueryDistPlanNodeInfo {
|
||||
bool stableQuery; // super table query or not
|
||||
|
|
|
@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
|
|||
|
||||
switch(type) {
|
||||
case QNODE_TAGSCAN:
|
||||
case QNODE_STREAMSCAN:
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
|
||||
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
|
||||
|
@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
|
|||
return pNode;
|
||||
}
|
||||
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
||||
SQueryPlanNode* pNode = NULL;
|
||||
if (pQueryInfo->info.continueQuery) {
|
||||
pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info);
|
||||
} else {
|
||||
pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
||||
}
|
||||
|
||||
if (!pQueryInfo->info.projectionQuery) {
|
||||
SArray* p = pQueryInfo->exprList[0];
|
||||
|
@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
|
|||
pNode->numOfExpr = num;
|
||||
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
|
||||
taosArrayAddAll(pNode->pExpr, p);
|
||||
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
|
|||
int32_t len = len1 + totalLen;
|
||||
|
||||
switch(pQueryNode->info.type) {
|
||||
case QNODE_STREAMSCAN:
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
|
||||
|
@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
|
|||
|
||||
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
|
||||
assert(pQueryNode);
|
||||
|
||||
*str = calloc(1, 4096);
|
||||
|
||||
int32_t len = sprintf(*str, "===== logic plan =====\n");
|
||||
|
|
|
@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
|
|||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
|
||||
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
|
||||
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
|
||||
int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
|
||||
return createUserTableScanNode(pPlanNode, pTable, type);
|
||||
}
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
|
@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
case QNODE_TAGSCAN:
|
||||
node = createTagScanNode(pPlanNode);
|
||||
break;
|
||||
case QNODE_STREAMSCAN:
|
||||
case QNODE_TABLESCAN:
|
||||
node = createTableScanNode(pCxt, pPlanNode);
|
||||
break;
|
||||
|
|
|
@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
|
|||
static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
|
||||
const SPhyNode* phyNode = (const SPhyNode*)obj;
|
||||
switch (phyNode->info.type) {
|
||||
case OP_StreamScan:
|
||||
case OP_TableScan:
|
||||
case OP_DataBlocksOptScan:
|
||||
case OP_TableSeqScan:
|
||||
|
|
|
@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
|||
}
|
||||
|
||||
if (pLogicPlan->info.type != QNODE_MODIFY) {
|
||||
// char* str = NULL;
|
||||
// queryPlanToString(pLogicPlan, &str);
|
||||
// printf("%s\n", str);
|
||||
char* str = NULL;
|
||||
queryPlanToString(pLogicPlan, &str);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
code = optimizeQueryPlan(pLogicPlan);
|
||||
|
|
Loading…
Reference in New Issue