From d8d67b93b1ce7c59d3a25a09305eba3c8f7edde2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 20 Jan 2022 17:46:41 -0800 Subject: [PATCH 1/5] minor changes --- tests/test/c/create_table.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index 080f1551c2..b8d8123380 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -85,7 +85,7 @@ void createDbAndStb() { } taos_free_result(pRes); - sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j int)", stbName); + sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j bigint)", stbName); pRes = taos_query(con, qstr); code = taos_errno(pRes); if (code != 0) { From 93f950c740c6472d6637f0266a51bac1c867aa14 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 10:28:41 +0800 Subject: [PATCH 2/5] refine query interface --- include/common/tmsg.h | 19 ++++++- include/libs/qcom/query.h | 61 +++++++++++++++++++++- include/libs/transport/trpc.h | 50 ------------------ source/dnode/mnode/impl/inc/mndDef.h | 15 ++++-- source/dnode/mnode/impl/src/mndSubscribe.c | 18 ++++++- source/dnode/vnode/src/tq/tq.c | 6 +-- 6 files changed, 105 insertions(+), 64 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 01b142e333..b468456cb7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq { char* sql; char* logicalPlan; char* physicalPlan; - SArray* tasks; // SArray + SSubQueryMsg msg; } SMqSetCVgReq; +static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen; + if (buf == NULL) return tlen; + memcpy(*buf, pMsg, tlen); + *buf = POINTER_SHIFT(*buf, tlen); + return tlen; +} + +static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen; + memcpy(pMsg, buf, tlen); + return POINTER_SHIFT(buf, tlen); +} + static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->vgId); @@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - pReq->tasks = NULL; + buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 02207c4d1b..1925f0e3bd 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -109,12 +109,71 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -const SSchema* tGetTbnameColumnSchema(); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr { + int32_t nodeId; // vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) { + pEpSet->inUse = pAddr->inUse; + pEpSet->numOfEps = pAddr->numOfEps; + for (int j = 0; j < TSDB_MAX_REPLICA; j++) { + pEpSet->port[j] = pAddr->epAddr[j].port; + memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN); + } +} + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + void initQueryModuleMsgHandle(); +const SSchema* tGetTbnameColumnSchema(); +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); + #define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 25e295f980..5afafa08a3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); - -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr { - int32_t nodeId; // vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 772b9bf079..aaedf280b5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer { #endif typedef struct SMqConsumerEp { - int32_t vgId; // -1 for unassigned - SEpSet epset; - int64_t consumerId; // -1 for unassigned - int64_t lastConsumerHbTs; - int64_t lastVgHbTs; + int32_t vgId; // -1 for unassigned + SEpSet epset; + int64_t consumerId; // -1 for unassigned + int64_t lastConsumerHbTs; + int64_t lastVgHbTs; + int32_t execLen; + SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { @@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); return tlen; } @@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); + pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; return buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4cf7505f74..a6634a9f01 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { strcpy(req.sql, pTopic->sql); strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.physicalPlan, pTopic->physicalPlan); + memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { SMqConsumerEp CEp; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - int32_t sz; + //convert phyplan to dag + SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); + SArray *pArray; + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + + } + int32_t sz = taosArrayGetSize(pArray); + //convert dag to msg + for (int32_t i = 0; i < sz; i++) { + STaskInfo* pTaskInfo = taosArrayGet(pArray, i); + int32_t vgId = pTaskInfo->addr.nodeId; + SEpSet epSet; + tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr); + } + /*pTopic->physicalPlan;*/ SVgObj *pVgroup = NULL; SSdb *pSdb = pMnode->pSdb; void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); @@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); } return 0; + qDestroyQueryDag(pDag); } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ead856a06b..cbc948b95e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { // TODO: handle error } - ASSERT(taosArrayGetSize(pArray) == 0); STaskInfo *pInfo = taosArrayGet(pArray, 0); SArray* pTasks; schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); @@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) // TODO: filter out unused column return 0; } + SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); @@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayPush(pArray, &colInfo); return pArray; } -/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t - * status) {*/ -/*return 0;*/ -/*}*/ From 09ba6a611d8d0d0fedf7fdcfb0909b6000e7227e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 11:19:24 +0800 Subject: [PATCH 3/5] [td-11818] refactor. --- include/libs/function/function.h | 1 + include/libs/planner/plannerOp.h | 1 + source/client/src/clientImpl.c | 5 + source/client/test/clientTests.cpp | 114 ++++++++++----------- source/dnode/vnode/src/tq/tq.c | 30 ++++++ source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executorMain.c | 12 +-- source/libs/executor/src/executorimpl.c | 20 ++-- source/libs/planner/inc/plannerInt.h | 27 ++--- source/libs/planner/src/logicPlan.c | 11 +- source/libs/planner/src/physicalPlan.c | 4 +- source/libs/planner/src/physicalPlanJson.c | 1 + source/libs/planner/src/planner.c | 6 +- 13 files changed, 141 insertions(+), 97 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 39d8ec3420..b15d5ad33a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc { bool interpQuery; bool distinct; bool join; + bool continueQuery; } SMultiFunctionsDesc; int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 2793e72635..42d3307ac8 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -23,6 +23,7 @@ #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME #endif +OP_ENUM_MACRO(StreamScan) OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableSeqScan) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 34ab1fb05a..159a92b0ab 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); + SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode; + pQueryStmtInfo->info.continueQuery = true; + // todo check for invalid sql statement and return with error code CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); @@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, goto _return; } + printf("%s\n", pStr); + // The topic should be related to a database that the queried table is belonged to. SName name = {0}; char dbName[TSDB_DB_FNAME_LEN] = {0}; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 440ef0d728..736a47273f 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) { // taosHashCleanup(phash); //} // -//TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == nullptr); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// char* sql = "select * from tu"; -// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -// taos_free_result(pRes); -// taos_close(pConn); -//} -// +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); @@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) { // taos_close(pConn); //} -TEST(testCase, agg_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use dbv"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "select count(*) from tu"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} \ No newline at end of file +//TEST(testCase, agg_query_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use dbv"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select count(*) from tu"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ead856a06b..e5f84a8d8d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "../../../../../include/libs/executor/executor.h" #include "tqInt.h" #include "tqMetaStore.h" @@ -766,3 +767,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { * status) {*/ /*return 0;*/ /*}*/ + +static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { + if (pMsg == NULL || pStreamBlockReadHandle == NULL) { + return NULL; + } + + // print those info into log + pMsg->sId = be64toh(pMsg->sId); + pMsg->queryId = be64toh(pMsg->queryId); + pMsg->taskId = be64toh(pMsg->taskId); + pMsg->contentLen = ntohl(pMsg->contentLen); + + struct SSubplan *plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + qTaskInfo_t pTaskInfo = NULL; + code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); + if (code != TSDB_CODE_SUCCESS) { + // TODO: destroy SSubplan & pTaskInfo + terrno = code; + return NULL; + } + + return pTaskInfo; +} \ No newline at end of file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2fe3392b25..73a30a62f5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -253,12 +253,8 @@ typedef struct SExecTaskInfo { STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure pthread_mutex_t lock; // used to synchronize the rsp/query threads -// tsem_t ready; -// int32_t dataReady; // denote if query result is ready or not -// void* rspContext; // response context char *sql; // query sql string jmp_buf env; // - DataSinkHandle dsHandle; struct SOperatorInfo *pRoot; } SExecTaskInfo; @@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index f39df4d4ae..f5c13ef782 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -69,11 +69,11 @@ void freeParam(STaskParam *param) { tfree(param->prevResult); } -int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { - assert(tsdb != NULL && pSubplan != NULL); +int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { + assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ goto _error; } - code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); + code = dsCreateDataSinker(pSubplan->pDataSink, handle); - *handle = (*pTask)->dsHandle; - -_error: + _error: // if failed to add ref for all tables in this query, abort current query return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index dc4d9c7238..e2beb43fa8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7778,22 +7778,26 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, return NULL; } -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { - tsdbReadHandleT tReaderHandle = NULL; - - int32_t code = 0; +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { uint64_t queryId = pPlan->id.queryId; - SPhyNode* pPhyNode = pPlan->pNode; - + int32_t code = TSDB_CODE_SUCCESS; *pTaskInfo = createExecTaskInfo(queryId); + if (*pTaskInfo == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _complete; + } (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); if ((*pTaskInfo)->pRoot == NULL) { - return terrno; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; } - return TSDB_CODE_SUCCESS; +_complete: + tfree(*pTaskInfo); + + terrno = code; + return code; } /** diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 4ff8364198..41f50607cb 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -28,19 +28,20 @@ extern "C" { #define QNODE_TAGSCAN 1 #define QNODE_TABLESCAN 2 -#define QNODE_PROJECT 3 -#define QNODE_AGGREGATE 4 -#define QNODE_GROUPBY 5 -#define QNODE_LIMIT 6 -#define QNODE_JOIN 7 -#define QNODE_DISTINCT 8 -#define QNODE_SORT 9 -#define QNODE_UNION 10 -#define QNODE_TIMEWINDOW 11 -#define QNODE_SESSIONWINDOW 12 -#define QNODE_STATEWINDOW 13 -#define QNODE_FILL 14 -#define QNODE_MODIFY 15 +#define QNODE_STREAMSCAN 3 +#define QNODE_PROJECT 4 +#define QNODE_AGGREGATE 5 +#define QNODE_GROUPBY 6 +#define QNODE_LIMIT 7 +#define QNODE_JOIN 8 +#define QNODE_DISTINCT 9 +#define QNODE_SORT 10 +#define QNODE_UNION 11 +#define QNODE_TIMEWINDOW 12 +#define QNODE_SESSIONWINDOW 13 +#define QNODE_STATEWINDOW 14 +#define QNODE_FILL 15 +#define QNODE_MODIFY 16 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 9c9ff8fe2b..e817b764d5 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla switch(type) { case QNODE_TAGSCAN: + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); @@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ return pNode; } - SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); + SQueryPlanNode* pNode = NULL; + if (pQueryInfo->info.continueQuery) { + pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info); + } else { + pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); + } if (!pQueryInfo->info.projectionQuery) { SArray* p = pQueryInfo->exprList[0]; @@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* pNode->numOfExpr = num; pNode->pExpr = taosArrayInit(num, POINTER_BYTES); taosArrayAddAll(pNode->pExpr, p); -// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); } } @@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t len = len1 + totalLen; switch(pQueryNode->info.type) { + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); @@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { assert(pQueryNode); - *str = calloc(1, 4096); int32_t len = sprintf(*str, "===== logic plan =====\n"); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 1e1a97df1f..dd869d87b3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); - return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; + return createUserTableScanNode(pPlanNode, pTable, type); } static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TAGSCAN: node = createTagScanNode(pPlanNode); break; + case QNODE_STREAMSCAN: case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 2abb90993b..cf54fdec85 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) { const SPhyNode* phyNode = (const SPhyNode*)obj; switch (phyNode->info.type) { + case OP_StreamScan: case OP_TableScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 9b32213ad7..6b3f37741e 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { -// char* str = NULL; -// queryPlanToString(pLogicPlan, &str); -// printf("%s\n", str); + char* str = NULL; + queryPlanToString(pLogicPlan, &str); + printf("%s\n", str); } code = optimizeQueryPlan(pLogicPlan); From 1e7c23cd3cf9c22ece49ea4b10184f03403dd0dc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 11:30:03 +0800 Subject: [PATCH 4/5] [td-11818] comment the print out. --- source/libs/planner/src/planner.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 6b3f37741e..9b32213ad7 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { - char* str = NULL; - queryPlanToString(pLogicPlan, &str); - printf("%s\n", str); +// char* str = NULL; +// queryPlanToString(pLogicPlan, &str); +// printf("%s\n", str); } code = optimizeQueryPlan(pLogicPlan); From 03ee26ed90878280d748d811e8b3ece2af7634ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 11:55:13 +0800 Subject: [PATCH 5/5] [td-11818] fix error. --- source/libs/executor/src/executorimpl.c | 3 +++ source/libs/qworker/src/qworker.c | 6 +++--- source/libs/scheduler/src/scheduler.c | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e2beb43fa8..0ed480ed15 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7791,8 +7791,11 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); if ((*pTaskInfo)->pRoot == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _complete; } + return code; + _complete: tfree(*pTaskInfo); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 566356e255..07f1b0f858 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task string to subplan failed, code:%x", code); + QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { - QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); + QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } - if ((pTaskInfo && NULL == sinkHandle) || (NULL == pTaskInfo && sinkHandle)) { + if (NULL == sinkHandle || NULL == pTaskInfo) { QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d8904cdfa9..ddfa73f0a5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -666,12 +666,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) int32_t taskDone = 0; int32_t code = 0; - SCH_TASK_DLOG("taskOnFailure, code:%x", errCode); + SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); + SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { code = schMoveTaskToFailList(pJob, pTask, &moved);