From 04487db683266e9ce967c9de61cc147cfe932799 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 13:38:08 +0800 Subject: [PATCH 1/6] [td-11818]Fix memory leak. --- source/client/src/clientEnv.c | 4 + source/client/src/clientImpl.c | 2 + source/client/src/clientMsgHandler.c | 20 +++-- source/client/test/clientTests.cpp | 112 ++++++++++++------------- source/dnode/mnode/impl/src/mndTopic.c | 2 +- source/libs/parser/src/dCDAstProcess.c | 5 +- source/libs/parser/src/parser.c | 26 +++--- 7 files changed, 93 insertions(+), 78 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index f747ccf3b6..d2696fb355 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -196,6 +196,10 @@ static void doDestroyRequest(void* p) { doFreeReqResultInfo(&pRequest->body.resInfo); qDestroyQueryDag(pRequest->body.pDag); + if (pRequest->body.showInfo.pArray != NULL) { + taosArrayDestroy(pRequest->body.showInfo.pArray); + } + deregisterRequest(pRequest); tfree(pRequest); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 159a92b0ab..42f9378a4e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -666,6 +666,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { if (pMsg->contLen > 0) { buf.pData = calloc(1, pMsg->contLen); + printf("create------------>%p\n", buf.pData); + if (buf.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; pMsg->code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 02e36043dc..ec088eb073 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -145,19 +145,23 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { } pSchema = pMetaMsg->pSchema; - TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD)); - for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) { - tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name)); - pFields[i].type = pSchema[i].type; - pFields[i].bytes = pSchema[i].bytes; - } + tfree(pRequest->body.resInfo.pRspMsg); pRequest->body.resInfo.pRspMsg = pMsg->pData; SReqResultInfo* pResInfo = &pRequest->body.resInfo; - pResInfo->fields = pFields; - pResInfo->numOfCols = pMetaMsg->numOfColumns; + if (pResInfo->fields == NULL) { + TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD)); + for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) { + tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name)); + pFields[i].type = pSchema[i].type; + pFields[i].bytes = pSchema[i].bytes; + } + pResInfo->fields = pFields; + } + + pResInfo->numOfCols = pMetaMsg->numOfColumns; pRequest->body.showInfo.execId = pShow->showId; // todo diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 736a47273f..891a6cfbf4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -451,39 +451,39 @@ TEST(testCase, driverInit_Test) { // // taos_close(pConn); //} -// -//TEST(testCase, show_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// } -// -// pRes = taos_query(pConn, "show abc1.tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// int32_t count = 0; -// char str[512] = {0}; -// -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%d: %s\n", ++count, str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// + +TEST(testCase, show_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + } + + pRes = taos_query(pConn, "show abc1.tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + int32_t count = 0; + char str[512] = {0}; + + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%d: %s\n", ++count, str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, drop_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -525,29 +525,29 @@ TEST(testCase, driverInit_Test) { // taosHashCleanup(phash); //} // -TEST(testCase, create_topic_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == nullptr); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_free_result(pRes); - - char* sql = "select * from tu"; - pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, create_topic_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == nullptr); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// taos_free_result(pRes); +// +// char* sql = "select * from tu"; +// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); +// taos_free_result(pRes); +// taos_close(pConn); +//} //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 1d4cbf37ce..ac66e7d88b 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char)); - SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); pTopic->logicalPlan = calloc(len + 1, sizeof(char)); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 5852678880..994875c0a3 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -62,9 +62,8 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out pEpSet->port[i] = info->epAddr[i].port; } - *outputLen = sizeof(SVShowTablesReq); - *output = pShowReq; - + *outputLen = sizeof(SVShowTablesReq); + *output = pShowReq; *pExtension = array; } else { if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 58e368aa0d..4271aae451 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -36,25 +36,29 @@ bool qIsDdlQuery(const SQueryNode* pQueryNode) { } int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { + int32_t code = TSDB_CODE_SUCCESS; + SSqlInfo info = doGenerateAST(pCxt->pSql); if (!info.valid) { strncpy(pCxt->pMsg, info.msg, pCxt->msgLen); - terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; - return terrno; + code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + goto _end; } if (!isDqlSqlStatement(&info)) { if (info.type == TSDB_SQL_CREATE_TABLE) { SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen); if (pModifStmtInfo == NULL) { - return terrno; + code = terrno; + goto _end; } *pQuery = (SQueryNode*)pModifStmtInfo; } else { SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen); if (pDcl == NULL) { - return terrno; + code = terrno; + goto _end; } *pQuery = (SQueryNode*)pDcl; @@ -63,21 +67,22 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } else { SQueryStmtInfo* pQueryInfo = createQueryInfo(); if (pQueryInfo == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code. - return terrno; + code = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code. + goto _end; } - int32_t code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen); + code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen); if (code == TSDB_CODE_SUCCESS) { *pQuery = (SQueryNode*)pQueryInfo; } else { - terrno = code; - return code; + goto _end; } } + _end: destroySqlInfo(&info); - return TSDB_CODE_SUCCESS; + terrno = code; + return code; } int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) { @@ -247,5 +252,6 @@ void qDestroyQuery(SQueryNode* pQueryNode) { SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode; taosArrayDestroy(pModifInfo->pDataBlocks); } + tfree(pQueryNode); } From 4c4be3c575bdccd492afb3abc1603488786a377c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 14:21:13 +0800 Subject: [PATCH 2/6] put createStreamExecTaskInfo into right place --- include/libs/executor/executor.h | 4 +++- source/dnode/vnode/inc/tq.h | 15 --------------- source/dnode/vnode/src/tq/tq.c | 24 +++++++++--------------- source/libs/executor/src/executor.c | 10 ++++++---- 4 files changed, 18 insertions(+), 35 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a98bb8f51a..36cc0f2665 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,9 @@ struct SSubplan; * @param pStreamBlockReadHandle * @return */ -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); + +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input); /** * Create the exec task object according to task json diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8c4effa221..f49542b5ec 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -82,27 +82,12 @@ typedef struct STqSubscribeReq { int64_t topic[]; } STqSubscribeReq; -typedef struct STqSubscribeRsp { - STqMsgHead head; - int64_t vgId; - char ep[TSDB_EP_LEN]; // TSDB_EP_LEN -} STqSubscribeRsp; - typedef struct STqHeartbeatReq { } STqHeartbeatReq; typedef struct STqHeartbeatRsp { } STqHeartbeatRsp; -typedef struct STqTopicVhandle { - int64_t topicId; - // executor for filter - void* filterExec; - // callback for mnode - // trigger when vnode list associated topic change - void* (*mCallback)(void*, void*); -} STqTopicVhandle; - #define TQ_BUFFER_SIZE 8 typedef struct STqExec { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89d4af48fd..52c541dcfd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -633,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { // read until find TDMT_VND_SUBMIT } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + void* task = pHandle->buffer.output[pos].task; - /*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/ + qStreamExecTaskSetInput(task, pCont); + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + } // TODO: launch query and get output data - void* outputData; - pHandle->buffer.output[pos].dst = outputData; + pHandle->buffer.output[pos].dst = pDataBlock; if (pHandle->buffer.firstOffset == -1 || pReq->offset < pHandle->buffer.firstOffset) { pHandle->buffer.firstOffset = pReq->offset; @@ -674,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan); - SArray *pArray; - //TODO: deserialize to SQueryDag - SQueryDag *pDag; - // convert to task - if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - // TODO: handle error - } - STaskInfo *pInfo = taosArrayGet(pArray, 0); - SArray* pTasks; - schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); + pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); pTopic->buffer.output[i].status = 0; - pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL); } pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a933402296..49bf42f383 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -13,10 +13,12 @@ * along with this program. If not, see . */ -#include "planner.h" #include "executor.h" +#include "planner.h" -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {} + +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) { if (pMsg == NULL || pStreamBlockReadHandle == NULL) { return NULL; } @@ -27,8 +29,8 @@ qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadH pMsg->taskId = be64toh(pMsg->taskId); pMsg->contentLen = ntohl(pMsg->contentLen); - struct SSubplan *plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + struct SSubplan* plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; From e1d9fa73b0e2f4c00390cafd8a88047ac877e945 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 15:16:17 +0800 Subject: [PATCH 3/6] [td-11818] Support create topic. --- include/libs/executor/executor.h | 40 ++++++++++----------- source/libs/executor/src/executor.c | 46 ++++++++++++++++++++++--- source/libs/executor/src/executorimpl.c | 20 +++++++++-- 3 files changed, 79 insertions(+), 27 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 36cc0f2665..457245e9a3 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,23 +29,23 @@ struct SSubplan; /** * Create the exec task for streaming mode * @param pMsg - * @param pStreamBlockReadHandle + * @param streamReadHandle * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); -void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input); +int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); /** * Create the exec task object according to task json - * @param tsdb + * @param readHandle * @param vgId * @param pTaskInfoMsg * @param pTaskInfo * @param qId * @return */ -int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, @@ -62,63 +62,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); * this function will be blocked to wait for the query execution completed or paused, * in which case enough results have been produced already. * - * @param qinfo + * @param tinfo * @return */ -int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext); +int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext); /** * * Retrieve the actual results to fill the response message payload. * Note that this function must be executed after qRetrieveQueryResultInfo is invoked. * - * @param qinfo qinfo object + * @param tinfo tinfo object * @param pRsp response message * @param contLen payload length * @return */ -//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); +//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); /** * return the transporter context (RPC) - * @param qinfo + * @param tinfo * @return */ -void* qGetResultRetrieveMsg(qTaskInfo_t qinfo); +void* qGetResultRetrieveMsg(qTaskInfo_t tinfo); /** * kill the ongoing query and free the query handle and corresponding resources automatically - * @param qinfo qhandle + * @param tinfo qhandle * @return */ -int32_t qKillTask(qTaskInfo_t qinfo); +int32_t qKillTask(qTaskInfo_t tinfo); /** * kill the ongoing query asynchronously - * @param qinfo qhandle + * @param tinfo qhandle * @return */ -int32_t qAsyncKillTask(qTaskInfo_t qinfo); +int32_t qAsyncKillTask(qTaskInfo_t tinfo); /** * return whether query is completed or not - * @param qinfo + * @param tinfo * @return */ -int32_t qIsTaskCompleted(qTaskInfo_t qinfo); +int32_t qIsTaskCompleted(qTaskInfo_t tinfo); /** * destroy query info structure * @param qHandle */ -void qDestroyTask(qTaskInfo_t qHandle); +void qDestroyTask(qTaskInfo_t tinfo); /** * Get the queried table uid * @param qHandle * @return */ -int64_t qGetQueriedTableUid(qTaskInfo_t qHandle); +int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); /** * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. @@ -145,7 +145,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t * @param type operation type: ADD|DROP * @return */ -int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type); +int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); //================================================================================================ // query handle management diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 49bf42f383..ccc1620264 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,12 +14,50 @@ */ #include "executor.h" +#include "tq.h" +#include "executorimpl.h" #include "planner.h" -void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {} +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) { + ASSERT(pOperator != NULL); + if (pOperator->operatorType != OP_StreamScan) { + if (pOperator->numOfDownstream > 0) { -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) { - if (pMsg == NULL || pStreamBlockReadHandle == NULL) { + if (pOperator->numOfDownstream > 1) { // not handle this in join query + return TSDB_CODE_QRY_APP_ERROR; + } + + return doSetStreamBlock(pOperator->pDownstream[0], input); + } + } else { + SStreamBlockScanInfo* pInfo = pOperator->info; + tqReadHandleSetMsg(pInfo->readerHandle, input, 0); + return TSDB_CODE_SUCCESS; + } +} + +int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { + if (tinfo == NULL) { + return TSDB_CODE_QRY_APP_ERROR; + } + + if (input == NULL) { + return TSDB_CODE_SUCCESS; + } + + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + } else { + qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + } + + return code; +} + +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { + if (pMsg == NULL || streamReadHandle == NULL) { return NULL; } @@ -37,7 +75,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockRead } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0ed480ed15..3b01c319e4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5407,7 +5407,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5417,10 +5417,21 @@ SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32 return NULL; } - pInfo->readerHandle = pStreamBlockHandle; + int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo); + SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + + taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); + } + + // TODO set the extract column id to streamHandle + // pColList + + pInfo->readerHandle = streamReadHandle; pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = OP_StreamBlockScan; + pOperator->operatorType = OP_StreamScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -7704,6 +7715,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask } else if (pPhyNode->info.type == OP_Exchange) { SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); + } else if (pPhyNode->info.type == OP_StreamScan) { + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo); } } From ecdd6784f6f20e24015709ed0b5e23ec2eefd273 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 15:36:24 +0800 Subject: [PATCH 4/6] [td-11818] refactor. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/src/executor.c | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 52c541dcfd..59c9828693 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -635,7 +635,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; void* task = pHandle->buffer.output[pos].task; - qStreamExecTaskSetInput(task, pCont); + qSetStreamInput(task, pCont); SSDataBlock* pDataBlock; uint64_t ts; if (qExecTask(task, &pDataBlock, &ts) < 0) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ccc1620264..ee96ac4a71 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -18,17 +18,20 @@ #include "executorimpl.h" #include "planner.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) { ASSERT(pOperator != NULL); if (pOperator->operatorType != OP_StreamScan) { - if (pOperator->numOfDownstream > 0) { - - if (pOperator->numOfDownstream > 1) { // not handle this in join query - return TSDB_CODE_QRY_APP_ERROR; - } - - return doSetStreamBlock(pOperator->pDownstream[0], input); + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId); + return TSDB_CODE_QRY_APP_ERROR; } + + if (pOperator->numOfDownstream > 1) { // not handle this in join query + qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId); + return TSDB_CODE_QRY_APP_ERROR; + } + + return doSetStreamBlock(pOperator->pDownstream[0], input, reqId); } else { SStreamBlockScanInfo* pInfo = pOperator->info; tqReadHandleSetMsg(pInfo->readerHandle, input, 0); @@ -46,7 +49,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { } SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input); + + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); } else { From 8a4444e5e1510424434f4c7bbd7622a84e9e4d09 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 15:42:12 +0800 Subject: [PATCH 5/6] [td-11818] Remove printf. --- source/client/src/clientImpl.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 42f9378a4e..159a92b0ab 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -666,8 +666,6 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { if (pMsg->contLen > 0) { buf.pData = calloc(1, pMsg->contLen); - printf("create------------>%p\n", buf.pData); - if (buf.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; pMsg->code = TSDB_CODE_OUT_OF_MEMORY; From 84ca79c5d68566c3f3ba850c057cf3acd6e2265c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 15:52:25 +0800 Subject: [PATCH 6/6] [td-11818] Add log for create table. --- source/dnode/vnode/src/vnd/vnodeWrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index d1b529f7fb..b109608193 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -84,7 +84,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error } - vTrace("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name); + vInfo("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name); free(pCreateTbReq->name); if (pCreateTbReq->type == TD_SUPER_TABLE) { free(pCreateTbReq->stbCfg.pSchema);