[td-11818] fix memory leak and refactor code.

This commit is contained in:
Haojun Liao 2022-01-05 14:09:02 +08:00
parent 4d33cb8f64
commit 27224db24b
12 changed files with 240 additions and 167 deletions

View File

@ -43,7 +43,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
bool qIsDdlQuery(const SQueryNode* pQuery); bool qIsDdlQuery(const SQueryNode* pQuery);
void qDestroyQuery(SQueryNode* pQuery); void qDestroyQuery(SQueryNode* pQueryNode);
/** /**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that

View File

@ -192,13 +192,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(pSendMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
pRequest->type = pQuery->type; pRequest->type = pQueryNode->type;
return qCreateQueryDag(pQuery, pDag, pRequest->requestId); return qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
@ -364,8 +363,6 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg); printf("failed to connect to server, reason: %s\n\n", errorMsg);
@ -456,17 +453,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
} }
SDataBuf buf = {.len = pMsg->contLen}; SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
buf.pData = calloc(1, pMsg->contLen);
if (buf.pData == NULL) { if (pMsg->contLen > 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; buf.pData = calloc(1, pMsg->contLen);
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; if (buf.pData == NULL) {
} else { terrno = TSDB_CODE_OUT_OF_MEMORY;
memcpy(buf.pData, pMsg->pCont, pMsg->contLen); pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
}
} }
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
} }
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {

View File

@ -272,6 +272,7 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL); assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
free(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);

View File

@ -171,7 +171,7 @@ TEST(testCase, create_db_Test) {
} }
taos_close(pConn); taos_close(pConn);
} }
//
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -190,7 +190,7 @@ TEST(testCase, create_db_Test) {
// //
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, drop_dnode_Test) { //TEST(testCase, drop_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -250,36 +250,36 @@ TEST(testCase, create_db_Test) {
//// taos_close(pConn); //// taos_close(pConn);
//} //}
// TEST(testCase, create_stable_Test) { TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// } }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL); ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -293,7 +293,7 @@ TEST(testCase, create_db_Test) {
// // // //
// // taos_close(pConn); // // taos_close(pConn);
//} //}
//
//TEST(testCase, create_ctable_Test) { //TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -304,12 +304,12 @@ TEST(testCase, create_db_Test) {
// } // }
// taos_free_result(pRes); // taos_free_result(pRes);
// //
//// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); // pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
//// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); // printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
//// } // }
//// //
//// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
// //
@ -343,7 +343,7 @@ TEST(testCase, create_db_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, show_vgroup_Test) { //TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -376,7 +376,7 @@ TEST(testCase, create_db_Test) {
// //
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, drop_stable_Test) { //TEST(testCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -464,66 +464,73 @@ TEST(testCase, create_db_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
//TEST(testCase, create_multiple_tables) { TEST(testCase, create_multiple_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s", taos_errstr(pRes));
// pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) { taos_close(pConn);
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); return;
// taos_free_result(pRes); }
// ASSERT_TRUE(false);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
// for(int32_t i = 0; i < 10000; ++i) {
// char sql[512] = {0};
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
// TAOS_RES* pres = taos_query(pConn, sql);
// if (taos_errno(pres) != 0) {
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
// }
// taos_free_result(pres);
// } // }
taos_close(pConn);
}
//TEST(testCase, generated_request_id_test) {
// SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
// //
// taos_free_result(pRes); //// for(int32_t i = 0; i < 1000000; ++i) {
// pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); //// uint64_t v = generateRequestId();
// if (taos_errno(pRes) != 0) { //// void* result = taosHashGet(phash, &v, sizeof(v));
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); //// ASSERT_EQ(result, nullptr);
// taos_free_result(pRes); //// taosHashPut(phash, &v, sizeof(v), NULL, 0);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
//
//// for(int32_t i = 0; i < 10000; ++i) {
//// char sql[512] = {0};
//// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
//// TAOS_RES* pres = taos_query(pConn, sql);
//// if (taos_errno(pres) != 0) {
//// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
//// }
//// taos_free_result(pres);
//// } //// }
// //
// taos_close(pConn); // taosHashCleanup(phash);
//} //}
TEST(testCase, generated_request_id_test) {
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
// for(int32_t i = 0; i < 1000000; ++i) {
// uint64_t v = generateRequestId();
// void* result = taosHashGet(phash, &v, sizeof(v));
// ASSERT_EQ(result, nullptr);
// taosHashPut(phash, &v, sizeof(v), NULL, 0);
// }
taosHashCleanup(phash);
}
//TEST(testCase, projection_query_tables) { //TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr); // ASSERT_EQ(pConn, nullptr);

View File

@ -214,7 +214,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet epSet; SEpSet epSet;
ctgGenEpSet(&epSet, vgroupInfo); ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {

View File

@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
taosArrayDestroy(pInfo->funcs); taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) { if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(&pInfo->sub); destroyAllSqlNode(&pInfo->sub);
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) { } else if (pInfo->type == TSDB_SQL_CREATE_STABLE || pInfo->type == TSDB_SQL_CREATE_TABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem); taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);

View File

@ -330,7 +330,7 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SDropTableMsg); *len = sizeof(SDropStbMsg);
return pDropTableMsg; return pDropTableMsg;
} }

View File

@ -1,5 +1,5 @@
#include <tmsg.h> #include "tmsg.h"
#include <ttime.h> #include "ttime.h"
#include "astToMsg.h" #include "astToMsg.h"
#include "parserInt.h" #include "parserInt.h"
#include "parserUtil.h" #include "parserUtil.h"
@ -248,16 +248,10 @@ static int32_t validateTableColumns(SArray* pFieldList, int32_t maxRowLength, in
} }
static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) { static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) {
assert(pFieldList != NULL); assert(pFieldList != NULL && pMsgBuf != NULL);
const char* msg1 = "first column must be timestamp"; const char* msg1 = "first column must be timestamp";
const char* msg2 = "row length exceeds max length"; const char* msg2 = "illegal number of columns";
const char* msg3 = "duplicated column names";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid binary/nchar column length";
const char* msg6 = "invalid column name";
const char* msg7 = "too many columns";
const char* msg8 = "illegal number of columns";
// first column must be timestamp // first column must be timestamp
SField* pField = taosArrayGet(pFieldList, 0); SField* pField = taosArrayGet(pFieldList, 0);
@ -268,7 +262,7 @@ static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) {
// number of fields no less than 2 // number of fields no less than 2
size_t numOfCols = taosArrayGetSize(pFieldList); size_t numOfCols = taosArrayGetSize(pFieldList);
if (numOfCols <= 1) { if (numOfCols <= 1) {
return buildInvalidOperationMsg(pMsgBuf, msg8); return buildInvalidOperationMsg(pMsgBuf, msg2);
} }
return validateTableColumns(pFieldList, TSDB_MAX_BYTES_PER_ROW, TSDB_MAX_COLUMNS, pMsgBuf); return validateTableColumns(pFieldList, TSDB_MAX_BYTES_PER_ROW, TSDB_MAX_COLUMNS, pMsgBuf);
@ -326,6 +320,8 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info; SVgroupInfo info;
} SVgroupTablesBatch; } SVgroupTablesBatch;
static SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap);
static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder, static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder,
SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) { SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) {
const char* msg1 = "illegal value or data overflow"; const char* msg1 = "illegal value or data overflow";
@ -351,6 +347,39 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
return code; return code;
} }
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const SName* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(pTableName));
req.ctbCfg.suid = suid;
req.ctbCfg.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = *pVgInfo;
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
assert(pVgInfo->vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req);
}
}
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->name);
tfree(pTableReq->ctbCfg.pTag);
}
taosArrayDestroy(pTbBatch->req.pArray);
}
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
@ -519,36 +548,32 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SVgroupInfo info = {0}; SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
struct SVCreateTbReq req = {0}; addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info);
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->uid;
req.ctbCfg.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = info;
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
assert(info.vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req);
}
tfree(pSuperTableMeta); tfree(pSuperTableMeta);
} }
// TODO: serialize and SArray *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = (char*) pStmtInfo;
*len = sizeof(SVnodeModifOpStmtInfo);
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch* pTbBatch = NULL; SVgroupTablesBatch* pTbBatch = NULL;
do { do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) break; if (pTbBatch == NULL) {
break;
}
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen); void* buf = malloc(tlen);
@ -569,17 +594,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray); pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData); taosArrayPush(pBufArray, &pVgData);
destroyCreateTbReqBatch(pTbBatch);
} while (true); } while (true);
return pBufArray;
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = (char*) pStmtInfo;
*len = sizeof(SVnodeModifOpStmtInfo);
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
} }
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
@ -855,14 +872,14 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBas
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
SVnodeModifOpStmtInfo* pInsertStmt = NULL; SVnodeModifOpStmtInfo* pModifSqlStmt = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pModifSqlStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt); tfree(pModifSqlStmt);
return NULL; return NULL;
} }
return pInsertStmt; return pModifSqlStmt;
} }

View File

@ -240,6 +240,10 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) {
taosArrayDestroy(pMetaReq->pUdf); taosArrayDestroy(pMetaReq->pUdf);
} }
void qDestroyQuery(SQueryNode* pQuery) { void qDestroyQuery(SQueryNode* pQueryNode) {
tfree(pQuery); if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks);
}
tfree(pQueryNode);
} }

View File

@ -392,6 +392,18 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
} }
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
if (pQueryNode->info.type == QNODE_MODIFY) {
SDataPayloadInfo* pInfo = pQueryNode->pExtInfo;
size_t size = taosArrayGetSize(pInfo->payload);
for (int32_t i = 0; i < size; ++i) {
SVgDataBlocks* pBlock = taosArrayGetP(pInfo->payload, i);
tfree(pBlock);
}
taosArrayDestroy(pInfo->payload);
}
tfree(pQueryNode->pExtInfo); tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema); tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name); tfree(pQueryNode->info.name);

View File

@ -305,10 +305,10 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode); vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pDataSink = createDataInserter(pCxt, blocks); subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->pNode = NULL; subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY; subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType; subplan->msgType = pPayload->msgType;
subplan->id.queryId = pCxt->pDag->queryId; subplan->id.queryId = pCxt->pDag->queryId;
RECOVERY_CURRENT_SUBPLAN(pCxt); RECOVERY_CURRENT_SUBPLAN(pCxt);

View File

@ -16,12 +16,44 @@
#include "parser.h" #include "parser.h"
#include "plannerInt.h" #include "plannerInt.h"
static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) {
return;
}
tfree(pSinkNode);
}
void qDestroySubplan(SSubplan* pSubplan) { void qDestroySubplan(SSubplan* pSubplan) {
// todo if (pSubplan == NULL) {
return;
}
taosArrayDestroy(pSubplan->pChildren);
taosArrayDestroy(pSubplan->pParents);
destroyDataSinkNode(pSubplan->pDataSink);
// todo destroy pNode
tfree(pSubplan);
} }
void qDestroyQueryDag(struct SQueryDag* pDag) { void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo if (pDag == NULL) {
return;
}
size_t size = taosArrayGetSize(pDag->pSubplans);
for(size_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pDag->pSubplans, i);
size_t t = taosArrayGetSize(pa);
for(int32_t j = 0; j < t; ++j) {
SSubplan* pSubplan = taosArrayGetP(pa, j);
qDestroySubplan(pSubplan);
}
taosArrayDestroy(pa);
}
taosArrayDestroy(pDag->pSubplans);
tfree(pDag);
} }
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) { int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {