[td-11818] fix memory leak and refactor.

This commit is contained in:
Haojun Liao 2022-01-05 17:49:15 +08:00
parent 27224db24b
commit 7916c1e622
11 changed files with 335 additions and 276 deletions

View File

@ -423,7 +423,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 16bit |12bit|20bit |16bit |
*| 12bit |12bit|24bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
@ -443,11 +443,11 @@ uint64_t generateRequestId() {
}
}
int64_t ts = taosGetTimestampUs();
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0xFFFF) << 48) | ((pid & 0x0FFF) << 36) | ((ts & 0xFFFFF) << 16) | (val & 0xFFFF);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}

View File

@ -540,7 +540,6 @@ void* doFetchRow(SRequestObj* pRequest) {
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) {

View File

@ -57,96 +57,96 @@ TEST(testCase, connect_Test) {
taos_close(pConn);
}
//TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, create_account_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, drop_account_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, show_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show users");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, drop_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, show_db_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show databases");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -172,61 +172,61 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
//TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000");
// if (taos_errno(pRes) != 0) {
// printf("error in create dnode, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000");
// if (taos_errno(pRes) != 0) {
// printf("failed to create dnode, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
TEST(testCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
//TEST(testCase, drop_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop dnode 2");
// if (taos_errno(pRes) != 0) {
// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, use_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_close(pConn);
//}
TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000");
if (taos_errno(pRes) != 0) {
printf("error in create dnode, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000");
if (taos_errno(pRes) != 0) {
printf("failed to create dnode, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop dnode 2");
if (taos_errno(pRes) != 0) {
printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
//TEST(testCase, drop_db_test) {
//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -281,18 +281,18 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
//TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// // assert(pConn != NULL);
// //
// // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// // taos_free_result(pRes);
// //
// // pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// // taos_free_result(pRes);
// //
// // taos_close(pConn);
//}
TEST(testCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -518,18 +518,21 @@ TEST(testCase, create_multiple_tables) {
taos_close(pConn);
}
//TEST(testCase, generated_request_id_test) {
// SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
//
//// for(int32_t i = 0; i < 1000000; ++i) {
//// uint64_t v = generateRequestId();
//// void* result = taosHashGet(phash, &v, sizeof(v));
//// ASSERT_EQ(result, nullptr);
//// taosHashPut(phash, &v, sizeof(v), NULL, 0);
//// }
//
// taosHashCleanup(phash);
//}
TEST(testCase, generated_request_id_test) {
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for(int32_t i = 0; i < 50000000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) {
printf("0x%"PRIx64", index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
}
taosHashCleanup(phash);
}
//TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);

View File

@ -10,7 +10,7 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);

View File

@ -38,14 +38,6 @@ typedef struct SMsgBuf {
char *buf;
} SMsgBuf;
// create table operation type
enum TSQL_CREATE_TABLE_TYPE {
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_CTABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
void clearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
void clearAllTableMetaInfo(SQueryStmtInfo* pQueryInfo, bool removeMeta, uint64_t id);

View File

@ -370,7 +370,7 @@ create_table_list(A) ::= create_from_stable(Z). {
pCreateTable->childTableInfo = taosArrayInit(4, sizeof(SCreatedTableInfo));
taosArrayPush(pCreateTable->childTableInfo, &Z);
pCreateTable->type = TSQL_CREATE_CTABLE;
pCreateTable->type = TSDB_SQL_CREATE_TABLE;
A = pCreateTable;
}
@ -381,7 +381,7 @@ create_table_list(A) ::= create_table_list(X) create_from_stable(Z). {
%type create_table_args{SCreateTableSql*}
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
A = tSetCreateTableInfo(X, NULL, NULL, TSQL_CREATE_TABLE);
A = tSetCreateTableInfo(X, NULL, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n;
@ -391,7 +391,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
// create super table
%type create_stable_args{SCreateTableSql*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
A = tSetCreateTableInfo(X, Y, NULL, TSDB_SQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
V.n += Z.n;
@ -421,11 +421,11 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(STo
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
// A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
// setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
//
// V.n += Z.n;
// setCreatedTableName(pInfo, &V, &U);
}
%type column{SField}

View File

@ -579,25 +579,21 @@ SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSe
SCreateTableSql *pCreate = calloc(1, sizeof(SCreateTableSql));
switch (type) {
case TSQL_CREATE_TABLE: {
case TSDB_SQL_CREATE_TABLE: {
pCreate->colInfo.pColumns = pCols;
assert(pTags == NULL);
break;
}
case TSQL_CREATE_STABLE: {
case TSDB_SQL_CREATE_STABLE: {
pCreate->colInfo.pColumns = pCols;
pCreate->colInfo.pTagColumns = pTags;
assert(pTags != NULL && pCols != NULL);
break;
}
case TSQL_CREATE_STREAM: {
pCreate->pSelect = pSelect;
break;
}
case TSQL_CREATE_CTABLE: {
assert(0);
}
// case TSQL_CREATE_STREAM: {
// pCreate->pSelect = pSelect;
// break;
// }
default:
assert(false);

View File

@ -230,7 +230,7 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt
return pCreateMsg;
}
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
@ -239,16 +239,16 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
}
SCreateStbMsg* pCreateTableMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
SCreateStbMsg* pCreateStbMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
char* pMsg = NULL;
#if 0
int32_t tableType = pCreateTableSql->type;
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value
#if 0
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
pCreateTableMsg->numOfTables = htonl(numOfTables);
pCreateStbMsg->numOfTables = htonl(numOfTables);
pMsg = (char*)pCreateMsg;
for (int32_t i = 0; i < numOfTables; ++i) {
@ -268,25 +268,27 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
}
} else {
#endif
} else { // create (super) table
// create (super) table
SName n = {0};
int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf);
if (code != 0) {
return NULL;
}
code = tNameExtractFullName(&n, pCreateTableMsg->name);
code = tNameExtractFullName(&n, pCreateStbMsg->name);
if (code != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
pCreateTableMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateTableMsg->numOfColumns = htonl(numOfCols);
pCreateTableMsg->numOfTags = htonl(numOfTags);
pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateStbMsg->numOfColumns = htonl(numOfCols);
pCreateStbMsg->numOfTags = htonl(numOfTags);
pSchema = (SSchema*) pCreateTableMsg->pSchema;
pSchema = (SSchema*)pCreateStbMsg->pSchema;
for (int i = 0; i < numOfCols; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
@ -306,12 +308,11 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
}
pMsg = (char*)pSchema;
}
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg);
*len = msgLen;
return pCreateTableMsg;
return pCreateStbMsg;
}
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {

View File

@ -1,10 +1,12 @@
#include "tmsg.h"
#include "ttime.h"
#include <astGenerator.h>
#include <tmsg.h>
#include "astToMsg.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
/* is contained in pFieldList or not */
static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
@ -291,11 +293,9 @@ static int32_t validateTagParams(SArray* pTagsList, SArray* pFieldList, SMsgBuf*
return validateTableColumns(pFieldList, TSDB_MAX_TAGS_LEN, TSDB_MAX_TAGS, pMsgBuf);
}
int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
int32_t doCheckForCreateTable(SCreateTableSql* pCreateTable, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid table name";
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SArray* pFieldList = pCreateTable->colInfo.pColumns;
SArray* pTagList = pCreateTable->colInfo.pTagColumns;
assert(pFieldList != NULL);
@ -374,20 +374,25 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->name);
tfree(pTableReq->ctbCfg.pTag);
if (pTableReq->type == TSDB_NORMAL_TABLE) {
tfree(pTableReq->ntbCfg.pSchema);
} else if (pTableReq->type == TSDB_CHILD_TABLE) {
tfree(pTableReq->ctbCfg.pTag);
} else {
assert(0);
}
}
taosArrayDestroy(pTbBatch->req.pArray);
}
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, SArray** pBufArray) {
const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow";
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// super table name, create table by using dst
@ -545,6 +550,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
return code;
}
// Find a appropriate vgroup to accommodate this table , according to the table name
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
@ -552,16 +558,103 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
tfree(pSuperTableMeta);
}
SArray *pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
*pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
}
static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
struct SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = strdup(tNameGetTableName(pTableName));
req.ntbCfg.nCols = taosArrayGetSize(pColumns);
int32_t num = req.ntbCfg.nCols;
req.ntbCfg.pSchema = calloc(num, sizeof(SSchema));
for(int32_t i = 0; i < num; ++i) {
SSchema* pSchema = taosArrayGet(pColumns, i);
memcpy(&req.ntbCfg.pSchema[i], pSchema, sizeof(SSchema));
}
pBatch->info = *pVgroupInfo;
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (pBatch->req.pArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
taosArrayPush(pBatch->req.pArray, &req);
return TSDB_CODE_SUCCESS;
}
int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
SArray* pBufArray = NULL;
// it is a sql statement to create a normal table
if (pCreateTable->childTableInfo == NULL) {
assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL);
int32_t code = doCheckForCreateTable(pCreateTable, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SName tableName = {0};
code = createSName(&tableName, &pCreateTable->name, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
SVgroupTablesBatch tbatch = {0};
code = doBuildSingleTableBatchReq(&tableName, pCreateTable->colInfo.pColumns, &info, &tbatch);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pBufArray = taosArrayInit(1, POINTER_BYTES);
if (pBufArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
destroyCreateTbReqBatch(&tbatch);
} else { // it is a child table, created according to a super table
doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
}
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = (char*) pStmtInfo;
*len = sizeof(SVnodeModifOpStmtInfo);
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
@ -575,27 +668,10 @@ SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap) {
break;
}
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
/*int32_t code = */serializeVgroupTablesBatchImpl(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
return pBufArray;
}
@ -805,21 +881,13 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
case TSDB_SQL_CREATE_STABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
terrno = code;
goto _error;
}
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
if ((code = doCheckForCreateTable(pCreateTable, pMsgBuf)) != TSDB_CODE_SUCCESS) {
terrno = code;
goto _error;
}
pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = TDMT_MND_CREATE_STB;
break;
}
@ -867,7 +935,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
assert(pCreateTable->type == TSDB_SQL_CREATE_TABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
@ -875,7 +943,7 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBas
SVnodeModifOpStmtInfo* pModifSqlStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pModifSqlStmt, &msgLen);
int32_t code = doCheckAndBuildCreateTableReq(pCreateTable, pCtx, pMsgBuf, (char**) &pModifSqlStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pModifSqlStmt);
return NULL;

View File

@ -44,21 +44,21 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
}
if (!isDqlSqlStatement(&info)) {
bool toVnode = false;
// bool toVnode = false;
if (info.type == TSDB_SQL_CREATE_TABLE) {
SCreateTableSql* pCreateSql = info.pCreateTableInfo;
if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
}
// SCreateTableSql* pCreateSql = info.pCreateTableInfo;
// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
// toVnode = true;
// }
// }
if (toVnode) {
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pInsertInfo == NULL) {
// if (toVnode) {
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pModifStmtInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
*pQuery = (SQueryNode*)pModifStmtInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {

View File

@ -2642,7 +2642,7 @@ static void yy_reduce(
pCreateTable->childTableInfo = taosArrayInit(4, sizeof(SCreatedTableInfo));
taosArrayPush(pCreateTable->childTableInfo, &yymsp[0].minor.yy150);
pCreateTable->type = TSQL_CREATE_CTABLE;
pCreateTable->type = TSDB_SQL_CREATE_TABLE;
yylhsminor.yy326 = pCreateTable;
}
yymsp[0].minor.yy326 = yylhsminor.yy326;
@ -2656,7 +2656,7 @@ static void yy_reduce(
break;
case 140: /* create_table_args ::= ifnotexists ids cpxName LP columnlist RP */
{
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-1].minor.yy165, NULL, NULL, TSQL_CREATE_TABLE);
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-1].minor.yy165, NULL, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
@ -2666,7 +2666,7 @@ static void yy_reduce(
break;
case 141: /* create_stable_args ::= ifnotexists ids cpxName LP columnlist RP TAGS LP columnlist RP */
{
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-5].minor.yy165, yymsp[-1].minor.yy165, NULL, TSQL_CREATE_STABLE);
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-5].minor.yy165, yymsp[-1].minor.yy165, NULL, TSDB_SQL_CREATE_STABLE);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_STABLE);
yymsp[-8].minor.yy0.n += yymsp[-7].minor.yy0.n;
@ -2700,11 +2700,11 @@ static void yy_reduce(
break;
case 146: /* create_table_args ::= ifnotexists ids cpxName AS select */
{
yylhsminor.yy326 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy278, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
// yylhsminor.yy326 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy278, TSQL_CREATE_STREAM);
// setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
//
// yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
// setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
}
yymsp[-4].minor.yy326 = yylhsminor.yy326;
break;