Merge branch '3.0' into feature/qnode

This commit is contained in:
dapan1121 2022-01-04 13:23:53 +08:00 committed by GitHub
commit adc828d2f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 163 additions and 72 deletions

View File

@ -19,7 +19,9 @@
"ms-vscode.cmake-tools", "ms-vscode.cmake-tools",
"austin.code-gnu-global", "austin.code-gnu-global",
"visualstudioexptteam.vscodeintel", "visualstudioexptteam.vscodeintel",
"eamodio.gitlens" "eamodio.gitlens",
"matepek.vscode-catch2-test-adapter",
"spmeesseman.vscode-taskexplorer"
], ],
// Use 'forwardPorts' to make a list of ports inside the container available locally. // Use 'forwardPorts' to make a list of ports inside the container available locally.

View File

@ -142,7 +142,7 @@ struct SQueryNode;
/** /**
* Create the physical plan for the query, according to the AST. * Create the physical plan for the query, according to the AST.
*/ */
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag); int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule

View File

@ -68,8 +68,6 @@ typedef struct {
SysNameInfo taosGetSysNameInfo(); SysNameInfo taosGetSysNameInfo();
int64_t taosGetPid();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -146,6 +146,8 @@ int taos_init();
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj); void destroyTscObj(void*pObj);
uint64_t generateRequestId();
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);

View File

@ -87,6 +87,7 @@ static void tscInitLogFile() {
} }
} }
// todo close the transporter properly
void closeTransporter(STscObj* pTscObj) { void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) { if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
return; return;
@ -166,8 +167,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return NULL; return NULL;
} }
// TODO generated request uuid pRequest->requestId = generateRequestId();
pRequest->requestId = 0;
pRequest->metric.start = taosGetTimestampMs(); pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type; pRequest->type = type;
@ -410,6 +410,36 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0; return 0;
} }
/**
* The request id is an unsigned integer format of 64bit.
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 16bit |12bit|20bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
static int32_t requestSerialId = 0;
uint64_t generateRequestId() {
uint64_t hashId = 0;
char uid[64] = {0};
int32_t code = taosGetSystemUid(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
int64_t ts = taosGetTimestampUs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0xFFFF) << 48) | ((pid & 0x0FFF) << 36) | ((ts & 0xFFFFF) << 16) | (val & 0xFFFF);
return id;
}
#if 0 #if 0
#include "cJSON.h" #include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){ static setConfRet taos_set_config_imp(const char *config){

View File

@ -198,13 +198,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
pRequest->type = pQuery->type; pRequest->type = pQuery->type;
return qCreateQueryDag(pQuery, pDag); return qCreateQueryDag(pQuery, pDag, pRequest->requestId);
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
} }

View File

@ -147,29 +147,29 @@ TEST(testCase, connect_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
//TEST(testCase, create_db_Test) { TEST(testCase, create_db_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL); ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create database abc1 vgroups 4"); pRes = taos_query(pConn, "create database abc1 vgroups 4");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_close(pConn); taos_close(pConn);
//} }
// //
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -249,36 +249,36 @@ TEST(testCase, connect_Test) {
//// taos_close(pConn); //// taos_close(pConn);
//} //}
// TEST(testCase, create_stable_Test) { TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// } }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL); ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -465,7 +465,7 @@ TEST(testCase, connect_Test) {
TEST(testCase, create_multiple_tables) { TEST(testCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
@ -477,6 +477,14 @@ TEST(testCase, create_multiple_tables) {
ASSERT_TRUE(false); ASSERT_TRUE(false);
} }
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
@ -490,3 +498,51 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, generated_request_id_test) {
uint64_t id0 = generateRequestId();
uint64_t id1 = generateRequestId();
uint64_t id2 = generateRequestId();
uint64_t id3 = generateRequestId();
uint64_t id4 = generateRequestId();
ASSERT_NE(id0, id1);
ASSERT_NE(id1, id2);
ASSERT_NE(id2, id3);
ASSERT_NE(id4, id3);
ASSERT_NE(id0, id2);
ASSERT_NE(id0, id4);
ASSERT_NE(id0, id3);
// SHashObj *phash = taosHashInit()
}
//TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select * from t_2");
// if (taos_errno(pRes) != 0) {
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}

View File

@ -140,6 +140,7 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
buf = taosDecodeFixedU64(buf, &pReq->ver); buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize); buf = taosDecodeFixedU32(buf, &nsize);
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
for (size_t i = 0; i < nsize; i++) { for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req; SVCreateTbReq req;
buf = tDeserializeSVCreateTbReq(buf, &req); buf = tDeserializeSVCreateTbReq(buf, &req);

View File

@ -105,7 +105,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
*/ */
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan); int32_t stringToSubplan(const char* str, SSubplan** subplan);

View File

@ -309,6 +309,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
subplan->pNode = NULL; subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY; subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType; subplan->msgType = pPayload->msgType;
subplan->id.queryId = pCxt->pDag->queryId;
RECOVERY_CURRENT_SUBPLAN(pCxt); RECOVERY_CURRENT_SUBPLAN(pCxt);
} }
@ -328,7 +329,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
// todo deal subquery // todo deal subquery
} }
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) { int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
.pCatalog = pCatalog, .pCatalog = pCatalog,
@ -338,6 +339,8 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
}; };
*pDag = context.pDag; *pDag = context.pDag;
context.pDag->queryId = requestId;
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode); createSubplanByLevel(&context, pQueryNode);
} CATCH(code) { } CATCH(code) {

View File

@ -24,7 +24,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo // todo
} }
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) { int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
SQueryPlanNode* logicPlan; SQueryPlanNode* logicPlan;
int32_t code = createQueryPlan(pNode, &logicPlan); int32_t code = createQueryPlan(pNode, &logicPlan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -38,7 +38,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
return code; return code;
} }
code = createDag(logicPlan, NULL, pDag); code = createDag(logicPlan, NULL, pDag, requestId);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
qDestroyQueryDag(*pDag); qDestroyQueryDag(*pDag);

View File

@ -45,7 +45,8 @@ protected:
int32_t run() { int32_t run() {
SQueryDag* dag = nullptr; SQueryDag* dag = nullptr;
int32_t code = createDag(logicPlan_.get(), nullptr, &dag); uint64_t requestId = 20;
int32_t code = createDag(logicPlan_.get(), nullptr, &dag, requestId);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }
@ -60,7 +61,8 @@ protected:
return code; return code;
} }
SQueryDag* dag = nullptr; SQueryDag* dag = nullptr;
code = qCreateQueryDag(query, &dag); uint64_t requestId = 20;
code = qCreateQueryDag(query, &dag, requestId);
dag_.reset(dag); dag_.reset(dag);
return code; return code;
} }

View File

@ -700,7 +700,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(taosRand() & 0xFFFF); pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPid() + (int64_t)pConn->tranId); pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);

View File

@ -1130,8 +1130,4 @@ SysNameInfo taosGetSysNameInfo() {
return info; return info;
} }
int64_t taosGetPid() {
getpid();
}
#endif #endif