From 27224db24b09d8ad97e94b5aa56ac801ffe8f301 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Jan 2022 14:09:02 +0800 Subject: [PATCH] [td-11818] fix memory leak and refactor code. --- include/libs/parser/parser.h | 2 +- source/client/src/clientImpl.c | 27 ++-- source/client/src/clientMsgHandler.c | 1 + source/client/test/clientTests.cpp | 197 +++++++++++++------------ source/libs/catalog/src/catalog.c | 1 - source/libs/parser/src/astGenerator.c | 2 +- source/libs/parser/src/astToMsg.c | 2 +- source/libs/parser/src/dCDAstProcess.c | 111 ++++++++------ source/libs/parser/src/parser.c | 8 +- source/libs/planner/src/logicPlan.c | 12 ++ source/libs/planner/src/physicalPlan.c | 8 +- source/libs/planner/src/planner.c | 36 ++++- 12 files changed, 240 insertions(+), 167 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index a9e1f26d20..5bd18641bf 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -43,7 +43,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); bool qIsDdlQuery(const SQueryNode* pQuery); -void qDestroyQuery(SQueryNode* pQuery); +void qDestroyQuery(SQueryNode* pQueryNode); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a967a65c44..2bc276b0af 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -192,13 +192,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { } tsem_wait(&pRequest->body.rspSem); - destroySendMsgInfo(pSendMsg); return TSDB_CODE_SUCCESS; } -int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { - pRequest->type = pQuery->type; - return qCreateQueryDag(pQuery, pDag, pRequest->requestId); +int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { + pRequest->type = pQueryNode->type; + return qCreateQueryDag(pQueryNode, pDag, pRequest->requestId); } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { @@ -364,8 +363,6 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); - destroySendMsgInfo(body); - if (pRequest->code != TSDB_CODE_SUCCESS) { const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); printf("failed to connect to server, reason: %s\n\n", errorMsg); @@ -456,17 +453,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.len = pMsg->contLen}; - buf.pData = calloc(1, pMsg->contLen); - if (buf.pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - pMsg->code = TSDB_CODE_OUT_OF_MEMORY; - } else { - memcpy(buf.pData, pMsg->pCont, pMsg->contLen); + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; + + if (pMsg->contLen > 0) { + buf.pData = calloc(1, pMsg->contLen); + if (buf.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(buf.pData, pMsg->pCont, pMsg->contLen); + } } pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); rpcFreeCont(pMsg->pCont); + destroySendMsgInfo(pSendInfo); } TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 017fa493cf..5ac5d871c9 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -272,6 +272,7 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; + free(pMsg->pData); if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 2bf3232b26..e6759115ed 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -171,7 +171,7 @@ TEST(testCase, create_db_Test) { } taos_close(pConn); } -// + //TEST(testCase, create_dnode_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -190,7 +190,7 @@ TEST(testCase, create_db_Test) { // // taos_close(pConn); //} -// + //TEST(testCase, drop_dnode_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -250,36 +250,36 @@ TEST(testCase, create_db_Test) { //// taos_close(pConn); //} -// TEST(testCase, create_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + TEST(testCase, create_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + taos_close(pConn); +} //TEST(testCase, create_table_Test) { // // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -293,7 +293,7 @@ TEST(testCase, create_db_Test) { // // // // taos_close(pConn); //} -// + //TEST(testCase, create_ctable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -304,12 +304,12 @@ TEST(testCase, create_db_Test) { // } // taos_free_result(pRes); // -//// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); -//// } -//// -//// taos_free_result(pRes); +// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); // taos_close(pConn); //} // @@ -343,7 +343,7 @@ TEST(testCase, create_db_Test) { // taos_free_result(pRes); // taos_close(pConn); //} -// + //TEST(testCase, show_vgroup_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -376,7 +376,7 @@ TEST(testCase, create_db_Test) { // // taos_close(pConn); //} -// + //TEST(testCase, drop_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -464,66 +464,73 @@ TEST(testCase, create_db_Test) { // taos_close(pConn); //} -//TEST(testCase, create_multiple_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + +// for(int32_t i = 0; i < 10000; ++i) { +// char sql[512] = {0}; +// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); +// TAOS_RES* pres = taos_query(pConn, sql); +// if (taos_errno(pres) != 0) { +// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); +// } +// taos_free_result(pres); // } + + taos_close(pConn); +} + +//TEST(testCase, generated_request_id_test) { +// SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); // -// taos_free_result(pRes); -// pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// -//// for(int32_t i = 0; i < 10000; ++i) { -//// char sql[512] = {0}; -//// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); -//// TAOS_RES* pres = taos_query(pConn, sql); -//// if (taos_errno(pres) != 0) { -//// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); -//// } -//// taos_free_result(pres); +//// for(int32_t i = 0; i < 1000000; ++i) { +//// uint64_t v = generateRequestId(); +//// void* result = taosHashGet(phash, &v, sizeof(v)); +//// ASSERT_EQ(result, nullptr); +//// taosHashPut(phash, &v, sizeof(v), NULL, 0); //// } // -// taos_close(pConn); +// taosHashCleanup(phash); //} -TEST(testCase, generated_request_id_test) { - SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - -// for(int32_t i = 0; i < 1000000; ++i) { -// uint64_t v = generateRequestId(); -// void* result = taosHashGet(phash, &v, sizeof(v)); -// ASSERT_EQ(result, nullptr); -// taosHashPut(phash, &v, sizeof(v), NULL, 0); -// } - - taosHashCleanup(phash); -} - //TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_EQ(pConn, nullptr); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index abcfafa786..045c92b586 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -214,7 +214,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE SEpSet epSet; ctgGenEpSet(&epSet, vgroupInfo); - rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { diff --git a/source/libs/parser/src/astGenerator.c b/source/libs/parser/src/astGenerator.c index 3f45012405..288d6296fa 100644 --- a/source/libs/parser/src/astGenerator.c +++ b/source/libs/parser/src/astGenerator.c @@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) { taosArrayDestroy(pInfo->funcs); if (pInfo->type == TSDB_SQL_SELECT) { destroyAllSqlNode(&pInfo->sub); - } else if (pInfo->type == TSDB_SQL_CREATE_STABLE) { + } else if (pInfo->type == TSDB_SQL_CREATE_STABLE || pInfo->type == TSDB_SQL_CREATE_TABLE) { pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index ab8c9b8094..981eac1266 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -330,7 +330,7 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; - *len = sizeof(SDropTableMsg); + *len = sizeof(SDropStbMsg); return pDropTableMsg; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 8ff0636969..d9bfb86028 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -1,5 +1,5 @@ -#include -#include +#include "tmsg.h" +#include "ttime.h" #include "astToMsg.h" #include "parserInt.h" #include "parserUtil.h" @@ -248,16 +248,10 @@ static int32_t validateTableColumns(SArray* pFieldList, int32_t maxRowLength, in } static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) { - assert(pFieldList != NULL); + assert(pFieldList != NULL && pMsgBuf != NULL); const char* msg1 = "first column must be timestamp"; - const char* msg2 = "row length exceeds max length"; - const char* msg3 = "duplicated column names"; - const char* msg4 = "invalid data type"; - const char* msg5 = "invalid binary/nchar column length"; - const char* msg6 = "invalid column name"; - const char* msg7 = "too many columns"; - const char* msg8 = "illegal number of columns"; + const char* msg2 = "illegal number of columns"; // first column must be timestamp SField* pField = taosArrayGet(pFieldList, 0); @@ -268,7 +262,7 @@ static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) { // number of fields no less than 2 size_t numOfCols = taosArrayGetSize(pFieldList); if (numOfCols <= 1) { - return buildInvalidOperationMsg(pMsgBuf, msg8); + return buildInvalidOperationMsg(pMsgBuf, msg2); } return validateTableColumns(pFieldList, TSDB_MAX_BYTES_PER_ROW, TSDB_MAX_COLUMNS, pMsgBuf); @@ -326,6 +320,8 @@ typedef struct SVgroupTablesBatch { SVgroupInfo info; } SVgroupTablesBatch; +static SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap); + static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder, SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) { const char* msg1 = "illegal value or data overflow"; @@ -351,6 +347,39 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT return code; } +static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const SName* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) { + struct SVCreateTbReq req = {0}; + req.type = TD_CHILD_TABLE; + req.name = strdup(tNameGetTableName(pTableName)); + req.ctbCfg.suid = suid; + req.ctbCfg.pTag = row; + + SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); + if (pTableBatch == NULL) { + SVgroupTablesBatch tBatch = {0}; + tBatch.info = *pVgInfo; + + tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); + taosArrayPush(tBatch.req.pArray, &req); + + taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch)); + } else { // add to the correct vgroup + assert(pVgInfo->vgId == pTableBatch->info.vgId); + taosArrayPush(pTableBatch->req.pArray, &req); + } +} + +static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { + size_t size = taosArrayGetSize(pTbBatch->req.pArray); + for(int32_t i = 0; i < size; ++i) { + SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i); + tfree(pTableReq->name); + tfree(pTableReq->ctbCfg.pTag); + } + + taosArrayDestroy(pTbBatch->req.pArray); +} + int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { const char* msg1 = "invalid table name"; const char* msg2 = "tags number not matched"; @@ -519,36 +548,32 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SVgroupInfo info = {0}; catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); - struct SVCreateTbReq req = {0}; - req.type = TD_CHILD_TABLE; - req.name = strdup(tNameGetTableName(&tableName)); - req.ctbCfg.suid = pSuperTableMeta->uid; - req.ctbCfg.pTag = row; - - SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); - if (pTableBatch == NULL) { - SVgroupTablesBatch tBatch = {0}; - tBatch.info = info; - - tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); - taosArrayPush(tBatch.req.pArray, &req); - - taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch)); - } else { // add to the correct vgroup - assert(info.vgId == pTableBatch->info.vgId); - taosArrayPush(pTableBatch->req.pArray, &req); - } - + addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info); tfree(pSuperTableMeta); } - // TODO: serialize and + SArray *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap); + + SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + pStmtInfo->pDataBlocks = pBufArray; + + *pOutput = (char*) pStmtInfo; + *len = sizeof(SVnodeModifOpStmtInfo); + + taosHashCleanup(pVgroupHashmap); + return TSDB_CODE_SUCCESS; +} + +SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap) { SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SVgroupTablesBatch* pTbBatch = NULL; do { pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); - if (pTbBatch == NULL) break; + if (pTbBatch == NULL) { + break; + } int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); void* buf = malloc(tlen); @@ -569,17 +594,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray); taosArrayPush(pBufArray, &pVgData); + destroyCreateTbReqBatch(pTbBatch); } while (true); - - SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); - pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; - pStmtInfo->pDataBlocks = pBufArray; - - *pOutput = (char*) pStmtInfo; - *len = sizeof(SVnodeModifOpStmtInfo); - - taosHashCleanup(pVgroupHashmap); - return TSDB_CODE_SUCCESS; + return pBufArray; } SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { @@ -855,14 +872,14 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBas SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; - SVnodeModifOpStmtInfo* pInsertStmt = NULL; + SVnodeModifOpStmtInfo* pModifSqlStmt = NULL; int32_t msgLen = 0; - int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); + int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pModifSqlStmt, &msgLen); if (code != TSDB_CODE_SUCCESS) { - tfree(pInsertStmt); + tfree(pModifSqlStmt); return NULL; } - return pInsertStmt; + return pModifSqlStmt; } \ No newline at end of file diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index ad248f1795..bdb3dcd3e3 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -240,6 +240,10 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) { taosArrayDestroy(pMetaReq->pUdf); } -void qDestroyQuery(SQueryNode* pQuery) { - tfree(pQuery); +void qDestroyQuery(SQueryNode* pQueryNode) { + if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) { + SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode; + taosArrayDestroy(pModifInfo->pDataBlocks); + } + tfree(pQueryNode); } diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index d04fd716c2..fa7b3776dc 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -392,6 +392,18 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { } static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { + if (pQueryNode->info.type == QNODE_MODIFY) { + SDataPayloadInfo* pInfo = pQueryNode->pExtInfo; + + size_t size = taosArrayGetSize(pInfo->payload); + for (int32_t i = 0; i < size; ++i) { + SVgDataBlocks* pBlock = taosArrayGetP(pInfo->payload, i); + tfree(pBlock); + } + + taosArrayDestroy(pInfo->payload); + } + tfree(pQueryNode->pExtInfo); tfree(pQueryNode->pSchema); tfree(pQueryNode->info.name); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 7f472be756..e7468e44eb 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -305,10 +305,10 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); - subplan->pDataSink = createDataInserter(pCxt, blocks); - subplan->pNode = NULL; - subplan->type = QUERY_TYPE_MODIFY; - subplan->msgType = pPayload->msgType; + subplan->pDataSink = createDataInserter(pCxt, blocks); + subplan->pNode = NULL; + subplan->type = QUERY_TYPE_MODIFY; + subplan->msgType = pPayload->msgType; subplan->id.queryId = pCxt->pDag->queryId; RECOVERY_CURRENT_SUBPLAN(pCxt); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e50f4d02b9..b61c7c390f 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -16,12 +16,44 @@ #include "parser.h" #include "plannerInt.h" +static void destroyDataSinkNode(SDataSink* pSinkNode) { + if (pSinkNode == NULL) { + return; + } + tfree(pSinkNode); +} + void qDestroySubplan(SSubplan* pSubplan) { - // todo + if (pSubplan == NULL) { + return; + } + + taosArrayDestroy(pSubplan->pChildren); + taosArrayDestroy(pSubplan->pParents); + destroyDataSinkNode(pSubplan->pDataSink); + // todo destroy pNode + tfree(pSubplan); } void qDestroyQueryDag(struct SQueryDag* pDag) { - // todo + if (pDag == NULL) { + return; + } + + size_t size = taosArrayGetSize(pDag->pSubplans); + for(size_t i = 0; i < size; ++i) { + SArray* pa = taosArrayGetP(pDag->pSubplans, i); + + size_t t = taosArrayGetSize(pa); + for(int32_t j = 0; j < t; ++j) { + SSubplan* pSubplan = taosArrayGetP(pa, j); + qDestroySubplan(pSubplan); + } + taosArrayDestroy(pa); + } + + taosArrayDestroy(pDag->pSubplans); + tfree(pDag); } int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {