From d862dcae8387d85fdb620b31467b437dc877d722 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jan 2022 12:58:39 +0800 Subject: [PATCH] [td-11818] refactor, fix bug in repeat create table, add request id for each query. --- include/libs/planner/planner.h | 2 +- include/os/osSysinfo.h | 2 - source/client/inc/clientInt.h | 2 + source/client/src/clientEnv.c | 34 ++++++- source/client/src/clientImpl.c | 3 +- source/client/test/clientTests.cpp | 135 +++++++++++++++---------- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 5 +- source/libs/planner/src/planner.c | 4 +- source/libs/transport/src/rpcMain.c | 2 +- source/os/src/osSysinfo.c | 4 - 11 files changed, 126 insertions(+), 69 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 5ad53703c7..a310b04e9e 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -141,7 +141,7 @@ struct SQueryNode; /** * Create the physical plan for the query, according to the AST. */ -int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag); +int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 6952b91742..b410255ea4 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -68,8 +68,6 @@ typedef struct { SysNameInfo taosGetSysNameInfo(); -int64_t taosGetPid(); - #ifdef __cplusplus } #endif diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 7aa4bc9ee9..705d6ef786 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -146,6 +146,8 @@ int taos_init(); void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo); void destroyTscObj(void*pObj); +uint64_t generateRequestId(); + void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void destroyRequest(SRequestObj* pRequest); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 960ba95324..bfb884c57e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -87,6 +87,7 @@ static void tscInitLogFile() { } } +// todo close the transporter properly void closeTransporter(STscObj* pTscObj) { if (pTscObj == NULL || pTscObj->pTransporter == NULL) { return; @@ -166,8 +167,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty return NULL; } - // TODO generated request uuid - pRequest->requestId = 0; + pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampMs(); pRequest->type = type; @@ -410,6 +410,36 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return 0; } +/** + * The request id is an unsigned integer format of 64bit. + *+------------+-----+-----------+---------------+ + *| uid|localIp| PId | timestamp | serial number | + *+------------+-----+-----------+---------------+ + *| 16bit |12bit|20bit |16bit | + *+------------+-----+-----------+---------------+ + * @return + */ +static int32_t requestSerialId = 0; +uint64_t generateRequestId() { + uint64_t hashId = 0; + + char uid[64] = {0}; + int32_t code = taosGetSystemUid(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno))); + + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + + int64_t ts = taosGetTimestampUs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + + uint64_t id = ((hashId & 0xFFFF) << 48) | ((pid & 0x0FFF) << 36) | ((ts & 0xFFFFF) << 16) | (val & 0xFFFF); + return id; +} + #if 0 #include "cJSON.h" static setConfRet taos_set_config_imp(const char *config){ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b0b2c57ee4..0d590235ad 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -198,13 +198,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { pRequest->type = pQuery->type; - return qCreateQueryDag(pQuery, pDag); + return qCreateQueryDag(pQuery, pDag, pRequest->requestId); } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); } + return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ae1a36bd36..bb40d9ada2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -147,29 +147,29 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -//TEST(testCase, create_db_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create database abc1 vgroups 4"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_close(pConn); -//} +TEST(testCase, create_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database abc1 vgroups 4"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_close(pConn); +} // //TEST(testCase, create_dnode_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -249,36 +249,36 @@ TEST(testCase, connect_Test) { //// taos_close(pConn); //} -// TEST(testCase, create_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); -// if (taos_errno(pRes) != 0) { -// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); -// } -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + TEST(testCase, create_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + taos_close(pConn); +} //TEST(testCase, create_table_Test) { // // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -470,7 +470,15 @@ TEST(testCase, create_multiple_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)"); + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); if (taos_errno(pRes) != 0) { printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -491,6 +499,25 @@ TEST(testCase, create_multiple_tables) { taos_close(pConn); } +TEST(testCase, generated_request_id_test) { + uint64_t id0 = generateRequestId(); + + uint64_t id1 = generateRequestId(); + uint64_t id2 = generateRequestId(); + uint64_t id3 = generateRequestId(); + uint64_t id4 = generateRequestId(); + + ASSERT_NE(id0, id1); + ASSERT_NE(id1, id2); + ASSERT_NE(id2, id3); + ASSERT_NE(id4, id3); + ASSERT_NE(id0, id2); + ASSERT_NE(id0, id4); + ASSERT_NE(id0, id3); + +// SHashObj *phash = taosHashInit() +} + //TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_EQ(pConn, nullptr); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 4b248f90e7..ebd75b9a82 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -105,7 +105,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); */ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); -int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index a53cbe836d..4ff9881ccd 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -309,6 +309,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan subplan->pNode = NULL; subplan->type = QUERY_TYPE_MODIFY; subplan->msgType = pPayload->msgType; + subplan->id.queryId = pCxt->pDag->queryId; RECOVERY_CURRENT_SUBPLAN(pCxt); } @@ -328,7 +329,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { // todo deal subquery } -int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) { +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) { TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { .pCatalog = pCatalog, @@ -338,6 +339,8 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD }; *pDag = context.pDag; + context.pDag->queryId = requestId; + context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); createSubplanByLevel(&context, pQueryNode); } CATCH(code) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 9621c2e227..388a57c7e3 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -24,7 +24,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { // todo } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) { SQueryPlanNode* logicPlan; int32_t code = createQueryPlan(pNode, &logicPlan); if (TSDB_CODE_SUCCESS != code) { @@ -38,7 +38,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) return code; } - code = createDag(logicPlan, NULL, pDag); + code = createDag(logicPlan, NULL, pDag, requestId); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(logicPlan); qDestroyQueryDag(*pDag); diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 19fe803035..310944e9b6 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -700,7 +700,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->sid = sid; pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); pConn->ownId = htonl(pConn->sid); - pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPid() + (int64_t)pConn->tranId); + pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId); pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index cb231e15a0..df006f44eb 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -1130,8 +1130,4 @@ SysNameInfo taosGetSysNameInfo() { return info; } -int64_t taosGetPid() { - getpid(); -} - #endif \ No newline at end of file