From 31ce27054bde733da1b3e4c244637545b02191ae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 17 Jan 2022 05:30:05 +0000 Subject: [PATCH 1/9] more --- .devcontainer/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index fc9a51af65..01172ae9c9 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,3 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} # [Optional] Uncomment this section to install additional packages. # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends +RUN apt-get update && apt-get -y install tree vim From feef8c395dca6cbb53d396304121d7ff2e6f7018 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 18 Jan 2022 02:37:20 +0000 Subject: [PATCH 2/9] more --- source/libs/tdb/inc/tdb.h | 2 +- source/libs/tdb/src/db/tdbDB.c | 25 ++++++++++++++++++++++--- source/libs/tdb/src/inc/tdbDB.h | 11 +++++++++++ source/libs/tdb/test/tdbTest.cpp | 6 +++--- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 905b08ee0b..eee8f8ed33 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -44,7 +44,7 @@ typedef struct { // TDB Operations TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type); -TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags); +TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags); TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags); #ifdef __cplusplus diff --git a/source/libs/tdb/src/db/tdbDB.c b/source/libs/tdb/src/db/tdbDB.c index 2af40d8642..eaf85ea4a1 100644 --- a/source/libs/tdb/src/db/tdbDB.c +++ b/source/libs/tdb/src/db/tdbDB.c @@ -56,9 +56,28 @@ _err: return 0; } -TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) { - // TODO - return 0; +TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags) { + int ret = 0; + + if ((dbp->fname = strdup(fname)) == NULL) { + ret = -1; + return ret; + } + + // Create the backup file if the file not exists + + // Open the file as a sub-db or a master-db + if (dbname) { + if ((dbp->dbname = strdup(dbname)) == NULL) { + ret = -1; + return ret; + } + // TODO: Open the DB as a SUB-DB in this file + } else { + // TODO: Open the DB as a MASTER-DB in this file + } + + return ret; } TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) { diff --git a/source/libs/tdb/src/inc/tdbDB.h b/source/libs/tdb/src/inc/tdbDB.h index fca197dc39..d0ef9e22d0 100644 --- a/source/libs/tdb/src/inc/tdbDB.h +++ b/source/libs/tdb/src/inc/tdbDB.h @@ -25,6 +25,14 @@ extern "C" { #endif +typedef struct { + // TODO +} TDB_MPOOL; + +typedef struct { + int fd; +} TDB_FH; + struct TDB { pgsize_t pageSize; tdb_db_t type; @@ -35,6 +43,9 @@ struct TDB { TDB_HASH * hash; TDB_HEAP * heap; } dbam; // db access method + + TDB_FH * fhp; // The backup file handle + TDB_MPOOL *mph; // The memory pool handle }; #ifdef __cplusplus diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 38c3b0b917..f27e17f1ca 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) { int ret; TDB *dbp; - tdbCreateDB(&dbp, TDB_BTREE_T); + // tdbCreateDB(&dbp, TDB_BTREE_T); - tdbOpenDB(dbp, 0); + // tdbOpenDB(dbp, 0); - tdbCloseDB(dbp, 0); + // tdbCloseDB(dbp, 0); } \ No newline at end of file From 9d50c907cf527ce164d861110c8ad329239ab48c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 18 Jan 2022 06:31:44 +0000 Subject: [PATCH 3/9] more --- source/dnode/vnode/inc/meta.h | 7 ++- source/dnode/vnode/src/meta/metaBDBImpl.c | 55 +++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 031ed178f0..75ddc37a03 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -42,7 +42,8 @@ typedef struct { SSchema *pSchema; } SSchemaWrapper; -typedef struct SMTbCursor SMTbCursor; +typedef struct SMTbCursor SMTbCursor; +typedef struct SMCtbCursor SMCtbCursor; typedef SVCreateTbReq STbCfg; @@ -64,6 +65,10 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); char * metaTbCursorNext(SMTbCursor *pTbCur); +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); +char * metaCtbCursorNext(SMCtbCursor *pCtbCur); + // Options void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsClear(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index ae13d9fddc..490333397f 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -621,4 +621,59 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { tdDestroyTSchemaBuilder(&sb); return pTSchema; +} + +struct SMCtbCursor { + DBC * pCur; + tb_uid_t suid; +}; + +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { + SMCtbCursor *pCtbCur = NULL; + SMetaDB * pDB = pMeta->pDB; + int ret; + + pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur)); + if (pCtbCur == NULL) { + return NULL; + } + + pCtbCur->suid = uid; + ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0); + if (ret != 0) { + free(pCtbCur); + return NULL; + } + + return pCtbCur; +} + +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { + if (pCtbCur) { + if (pCtbCur->pCur) { + pCtbCur->pCur->close(pCtbCur->pCur); + } + + free(pCtbCur); + } +} + +char *metaCtbCursorNext(SMCtbCursor *pCtbCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; + void * pBuf; + STbCfg tbCfg; + + // Set key + skey.data = &(pCtbCur->suid); + skey.size = sizeof(pCtbCur->suid); + + if (pCtbCur->pCur->get(pCtbCur->pCur, &skey, &pval, DB_NEXT) == 0) { + pBuf = pval.data; + metaDecodeTbInfo(pBuf, &tbCfg); + return tbCfg.name; + } else { + return NULL; + } } \ No newline at end of file From 6d9ae1af999861f771aca6ad78f669f741cf173c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jan 2022 19:08:07 +0800 Subject: [PATCH 4/9] [td-11818]Support select * from super_table. --- source/client/src/clientImpl.c | 17 +- source/client/test/clientTests.cpp | 192 ++++++++++----------- source/dnode/vnode/inc/meta.h | 2 +- source/dnode/vnode/inc/tsdb.h | 19 ++ source/dnode/vnode/src/meta/metaBDBImpl.c | 14 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 42 ++--- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorMain.c | 50 +++++- source/libs/executor/src/executorimpl.c | 13 +- source/libs/planner/src/physicalPlanJson.c | 28 ++- source/libs/qworker/CMakeLists.txt | 16 -- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/scheduler.c | 20 ++- 13 files changed, 240 insertions(+), 177 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 51f267e884..df0f060fbe 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -257,7 +257,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { return pRequest->code; } - return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); + SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); + + SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 0}; + addr.epAddr[0].port = 6030; + strcpy(addr.epAddr[0].fqdn, "ubuntu"); + + taosArrayPush(execNode, &addr); + return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); } typedef struct tmq_t tmq_t; @@ -706,9 +713,13 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); - setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); + int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + return NULL; + } + setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); if (pResultInfo->numOfRows == 0) { return NULL; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..59b133338e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,7 +49,7 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, connect_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } @@ -295,24 +295,24 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -TEST(testCase, create_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); - ASSERT_EQ(taos_errno(pRes), 0); - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); - ASSERT_NE(taos_errno(pRes), 0); - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, create_table_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); +// ASSERT_EQ(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); +// ASSERT_NE(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// taos_close(pConn); +//} //TEST(testCase, create_ctable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) { // taos_close(pConn); //} -TEST(testCase, show_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); - -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +//TEST(testCase, show_stable_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != nullptr); +// +//// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +// +// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); // if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); // } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// // taos_free_result(pRes); - - TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); - if (taos_errno(pRes) != 0) { - printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} +// taos_close(pConn); +//} // //TEST(testCase, show_vgroup_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -521,29 +521,29 @@ TEST(testCase, show_stable_Test) { // taosHashCleanup(phash); //} // -TEST(testCase, create_topic_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == nullptr); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_free_result(pRes); - - char* sql = "select * from tu"; - pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, create_topic_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == nullptr); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// taos_free_result(pRes); +// +// char* sql = "select * from tu"; +// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); +// taos_free_result(pRes); +// taos_close(pConn); +//} //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -614,30 +614,30 @@ TEST(testCase, create_topic_Test) { // taos_close(pConn); //} -//TEST(testCase, projection_query_stables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "select ts,k from m1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, projection_query_stables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts from m1"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 75ddc37a03..383073871e 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -67,7 +67,7 @@ char * metaTbCursorNext(SMTbCursor *pTbCur); SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); -char * metaCtbCursorNext(SMCtbCursor *pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); // Options void metaOptionsInit(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index f3d9f71346..25e7c8cffa 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -92,6 +92,7 @@ int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); typedef void* tsdbReadHandleT; + /** * Get the data block iterator, starting from position according to the query condition * @@ -123,6 +124,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle); +/** + * + * @param tsdb + * @param uid + * @param skey + * @param pTagCond + * @param len + * @param tagNameRelType + * @param tbnameCond + * @param pGroupInfo + * @param pColIndex + * @param numOfCols + * @param reqId + * @return + */ +int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, + int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, + SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId); /** * get num of rows in mem table * diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 490333397f..bbceaa3389 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -658,7 +658,7 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { } } -char *metaCtbCursorNext(SMCtbCursor *pCtbCur) { +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { DBT skey = {0}; DBT pkey = {0}; DBT pval = {0}; @@ -669,11 +669,13 @@ char *metaCtbCursorNext(SMCtbCursor *pCtbCur) { skey.data = &(pCtbCur->suid); skey.size = sizeof(pCtbCur->suid); - if (pCtbCur->pCur->get(pCtbCur->pCur, &skey, &pval, DB_NEXT) == 0) { - pBuf = pval.data; - metaDecodeTbInfo(pBuf, &tbCfg); - return tbCfg.name; + if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { + tb_uid_t id = *(tb_uid_t *)pkey.data; + assert(id != 0); + return id; +// metaDecodeTbInfo(pBuf, &tbCfg); +// return tbCfg.; } else { - return NULL; + return 0; } } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d07a5ffc77..15748118d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -152,7 +152,7 @@ typedef struct STsdbReadHandle { typedef struct STableGroupSupporter { int32_t numOfCols; SColIndex* pCols; - STSchema* pTagSchema; + SSchema* pTagSchema; } STableGroupSupporter; static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); @@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, return (tsdbReadHandleT)pReadHandle; _end: -// tsdbCleanupQueryHandle(pTsdbReadHandle); + tsdbCleanupQueryHandle(pReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int return numOfRows; } -static int32_t getAllTableList(STable* pSuperTable, SArray* list) { - SSkipListIterator* iter = NULL;//tSkipListCreateIter(pSuperTable->pIndex); - while (tSkipListIterNext(iter)) { - SSkipListNode* pNode = tSkipListIterGet(iter); +static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { + SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); - STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode); + while (1) { + tb_uid_t id = metaCtbCursorNext(pCur); + if (id == 0) { + break; + } - STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL}; + STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id}; taosArrayPush(list, &info); } - tSkipListDestroyIter(iter); + metaCloseCtbCurosr(pCur); return TSDB_CODE_SUCCESS; } @@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable taosArrayPush(pGroups, &g); } -SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { +SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { assert(pTableList != NULL); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); @@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC } if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table - SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo)); + SArray* sa = taosArrayDup(pTableList); if (sa == NULL) { taosArrayDestroy(pTableGroup); return NULL; } - for(int32_t i = 0; i < size; ++i) { - STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); - - STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; - taosArrayPush(sa, &info); - } - taosArrayPush(pTableGroup, &sa); tsdbDebug("all %" PRIzu " tables belong to one group", size); } else { STableGroupSupporter sup = {0}; sup.numOfCols = numOfOrderCols; - sup.pTagSchema = pTagSchema; + sup.pTagSchema = pTagSchema->pSchema; sup.pCols = pCols; // taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn); @@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch //NOTE: not add ref count for super table SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); + SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { - assert(false); - int32_t ret = 0;//getAllTableList(pTable, res); + int32_t ret = getAllTableList(tsdb->pMeta, uid, res); if (ret != TSDB_CODE_SUCCESS) { goto _error; } @@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro return TSDB_CODE_SUCCESS; } - +#endif static void* doFreeColumnInfoData(SArray* pColumnInfoData) { if (pColumnInfoData == NULL) { return NULL; @@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { return NULL; } + void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; if (pTsdbReadHandle == NULL) { @@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { tfree(pTsdbReadHandle); } +#if 0 void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { assert(pGroupList != NULL); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index adb305ab09..255b000d10 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -649,6 +649,6 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); +int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle); #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 1f5d0cd059..df28972645 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tarray.h" +#include #include "dataSinkMgt.h" #include "exception.h" +#include "os.h" +#include "tarray.h" #include "tcache.h" #include "tglobal.h" #include "tmsg.h" @@ -72,7 +73,45 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); + int32_t code = 0; + uint64_t uid = 0; + STimeWindow window = TSWINDOW_INITIALIZER; + int32_t tableType = 0; + + SPhyNode *pPhyNode = pSubplan->pNode; + if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { + STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode; + uid = pTableScanNode->scan.uid; + window = pTableScanNode->window; + tableType = pTableScanNode->scan.tableType; + } else { + assert(0); + } + + STableGroupInfo groupInfo = {0}; + if (tableType == TSDB_SUPER_TABLE) { + code = tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } else { // Create one table group. + groupInfo.numOfTables = 1; + groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + + SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); + + STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; + taosArrayPush(pa, &info); + taosArrayPush(groupInfo.pGroupList, &pa); + } + + if (groupInfo.numOfTables == 0) { + code = 0; +// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId); + goto _error; + } + + code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -141,6 +180,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + // todo: remove it. + if (tinfo == NULL) { + return NULL; + } + *pRes = NULL; int64_t curOwner = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b6df0c527e..9daf3298e4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7369,15 +7369,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask } } -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { +int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle) { STsdbQueryCond cond = {.loadExternalRows = false}; - uint64_t uid = 0; SPhyNode* pPhyNode = pPlan->pNode; if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode; - uid = pTableScanNode->scan.uid; cond.order = pTableScanNode->scan.order; cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets); cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); @@ -7397,15 +7395,8 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r assert(0); } - STableGroupInfo group = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES)}; - SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); - STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; - taosArrayPush(pa, &info); - - taosArrayPush(group.pGroupList, &pa); - *pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId); - tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL); + tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL); (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); if ((*pTaskInfo)->pRoot == NULL) { diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index c7a4e438ba..9aeae55c4f 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -558,11 +558,15 @@ static bool timeWindowToJson(const void* obj, cJSON* json) { static bool timeWindowFromJson(const cJSON* json, void* obj) { STimeWindow* win = (STimeWindow*)obj; - char* p = getString(json, jkTimeWindowStartKey); - win->skey = strtoll(p, NULL, 10); + char* pStartKey = getString(json, jkTimeWindowStartKey); + win->skey = strtoll(pStartKey, NULL, 10); + + char* pEndKey = getString(json, jkTimeWindowEndKey); + win->ekey = strtoll(pEndKey, NULL, 10); + + tfree(pStartKey); + tfree(pEndKey); - p = getString(json, jkTimeWindowEndKey); - win->ekey = strtoll(p, NULL, 10); return true; } @@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse"; static bool scanNodeToJson(const void* obj, cJSON* json) { const SScanPhyNode* pNode = (const SScanPhyNode*)obj; - bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid); + + char uid[40] = {0}; + snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid); + bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid); if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType); } + if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order); } + if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count); } @@ -589,12 +598,17 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse); } + return res; } static bool scanNodeFromJson(const cJSON* json, void* obj) { SScanPhyNode* pNode = (SScanPhyNode*)obj; - pNode->uid = getNumber(json, jkScanNodeTableId); + + char* val = getString(json, jkScanNodeTableId); + pNode->uid = strtoull(val, NULL, 10); + tfree(val); + pNode->tableType = getNumber(json, jkScanNodeTableType); pNode->count = getNumber(json, jkScanNodeTableCount); pNode->order = getNumber(json, jkScanNodeTableOrder); @@ -726,7 +740,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) { res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); } if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr)); + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, sizeof(SEpAddr), ep->numOfEps); } return res; } diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index a3db9c6992..9ada451c61 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -1,15 +1,4 @@ aux_source_directory(src QWORKER_SRC) -#add_library(qworker ${QWORKER_SRC}) -#target_include_directories( -# qworker -# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker" -# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -#) -# -#target_link_libraries( -# qworker -# PRIVATE os util transport planner qcom executor -#) add_library(qworker STATIC ${QWORKER_SRC}) target_include_directories( @@ -18,11 +7,6 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -#set_target_properties(qworker PROPERTIES -# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a" -# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker" -# ) - target_link_libraries(qworker PRIVATE os util transport planner qcom executor ) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 661beee5d5..6b047eb96e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,7 +29,7 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 -#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA +#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA enum { SCH_READ = 1, diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c53926f8c1..ffb602bd36 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -26,9 +26,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel * pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->execAddrs) { - SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -383,9 +383,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } pTask->candidateIdx = 0; - pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->candidateAddrs) { - SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -405,10 +405,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -418,12 +418,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } if (addNum <= 0) { - SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); + SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum); return TSDB_CODE_QRY_INVALID_INPUT; } /* - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; @@ -734,7 +734,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } /* - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; @@ -1165,6 +1165,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); SCH_ERR_JRET(code); } + + printf("physical plan:%s\n", pTask->msg); } SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); From 3f4551fa01b219fb6d7a94566df8cb40a3f470d6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 18 Jan 2022 11:44:03 +0000 Subject: [PATCH 5/9] fix ctb cursor issue --- source/dnode/vnode/src/meta/metaBDBImpl.c | 25 ++++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index bbceaa3389..d8ee48ff23 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -374,22 +374,27 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey DBT * pDbt; if (pTbCfg->type == META_CHILD_TABLE) { - pDbt = calloc(2, sizeof(DBT)); + // pDbt = calloc(2, sizeof(DBT)); - // First key is suid - pDbt[0].data = &(pTbCfg->ctbCfg.suid); - pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); + // // First key is suid + // pDbt[0].data = &(pTbCfg->ctbCfg.suid); + // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); - // Second key is the first tag - void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); - pDbt[1].data = pTagVal; - pDbt[1].size = sizeof(int32_t); + // // Second key is the first tag + // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); + // pDbt[1].data = pTagVal; + // pDbt[1].size = sizeof(int32_t); // Set index key memset(pSKey, 0, sizeof(*pSKey)); +#if 0 pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; pSKey->data = pDbt; pSKey->size = 2; +#else + pSKey->data = &(pTbCfg->ctbCfg.suid); + pSKey->size = sizeof(pTbCfg->ctbCfg.suid); +#endif return 0; } else { @@ -673,8 +678,8 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { tb_uid_t id = *(tb_uid_t *)pkey.data; assert(id != 0); return id; -// metaDecodeTbInfo(pBuf, &tbCfg); -// return tbCfg.; + // metaDecodeTbInfo(pBuf, &tbCfg); + // return tbCfg.; } else { return 0; } From 1145d1a0ea1248f9de562400f3b9d8baf7b53bcd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jan 2022 16:20:13 +0800 Subject: [PATCH 6/9] [td-11818] select * --- source/client/src/clientImpl.c | 2 +- source/libs/executor/src/executorMain.c | 20 ++++++++++---------- source/libs/executor/src/executorimpl.c | 7 ++++++- source/libs/planner/src/physicalPlanJson.c | 8 ++++---- source/libs/qworker/src/qworker.c | 2 ++ 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8fdde9f4e9..10d1d09c85 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -259,7 +259,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); - SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 1}; + SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2}; addr.epAddr[0].port = 6030; strcpy(addr.epAddr[0].fqdn, "localhost"); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 9fad5242ba..56e2977753 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -73,12 +73,12 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = 0; - uint64_t uid = 0; - STimeWindow window = TSWINDOW_INITIALIZER; - int32_t tableType = 0; + int32_t code = 0; + uint64_t uid = 0; + STimeWindow window = TSWINDOW_INITIALIZER; + int32_t tableType = 0; - SPhyNode *pPhyNode = pSubplan->pNode; + SPhyNode* pPhyNode = pSubplan->pNode; STableGroupInfo groupInfo = {0}; int32_t type = pPhyNode->info.type; @@ -112,10 +112,10 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ } } - code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; code = dsDataSinkMgtInit(&cfg); @@ -127,7 +127,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ *handle = (*pTask)->dsHandle; - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1b3c93ab61..4cb233f71e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4952,7 +4952,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { } static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo *pOperator = (SOperatorInfo*) param; SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; @@ -5012,9 +5012,14 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; memcpy(tmp, pData, len); + + pColInfoData->pData = tmp; pData += len; } + pRes->info.numOfCols = pOperator->numOfOutput; + pRes->info.rows = pExchangeInfo->pRsp->numOfRows; + return pExchangeInfo->pResult; } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 1b6cf89c56..bf38712b13 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -559,10 +559,10 @@ static bool timeWindowFromJson(const cJSON* json, void* obj) { STimeWindow* win = (STimeWindow*)obj; char* pStartKey = getString(json, jkTimeWindowStartKey); - win->skey = strtoll(pStartKey, NULL, 10); + win->skey = strtoul(pStartKey, NULL, 10); char* pEndKey = getString(json, jkTimeWindowEndKey); - win->ekey = strtoll(pEndKey, NULL, 10); + win->ekey = strtoul(pEndKey, NULL, 10); tfree(pStartKey); tfree(pEndKey); @@ -783,7 +783,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) { pSource->taskId = getNumber(json, jkNodeTaskId); char* pSchedId = getString(json, jkNodeTaskSchedId); - pSource->schedId = strtoll(pSchedId, NULL, 10); + pSource->schedId = strtoul(pSchedId, NULL, 10); tfree(pSchedId); bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true); @@ -1032,7 +1032,7 @@ static bool subplanIdFromJson(const cJSON* json, void* obj) { SSubplanId* id = (SSubplanId*)obj; char* queryId = getString(json, jkIdQueryId); - id->queryId = strtoll(queryId, NULL, 0); + id->queryId = strtoul(queryId, NULL, 0); tfree(queryId); id->templateId = getNumber(json, jkIdTemplateId); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a0beaba61d..d01f4f4e52 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -258,6 +258,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); + printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId); + SQWTaskCtx nctx = {0}; QW_LOCK(QW_WRITE, &mgmt->ctxLock); From 0330ab12ad1e6759fafc5f095103693096140a52 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jan 2022 17:03:29 +0800 Subject: [PATCH 7/9] [td-11818] Refactor. --- source/libs/executor/src/executorimpl.c | 65 +++++++++++++++++-------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4cb233f71e..27c24669cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4558,6 +4558,7 @@ void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); +void createResultBlock(const SArray* pExprInfo, SExchangeInfo* pInfo, const SOperatorInfo* pOperator, size_t size); static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0 @@ -4930,7 +4931,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; assert(pMsg->ahandle != NULL); @@ -4958,13 +4959,14 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; *newgroup = false; - if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { return NULL; } SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); if (NULL == pMsg) { // todo handle malloc error + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); @@ -4983,6 +4985,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } pMsgSendInfo->param = pExchangeInfo; @@ -4993,7 +4997,6 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pExchangeInfo->ready); if (pExchangeInfo->pRsp->numOfRows == 0) { @@ -5007,7 +5010,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); if (tmp == NULL) { - // todo + goto _error; } size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; @@ -5021,8 +5024,17 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pRes->info.rows = pExchangeInfo->pRsp->numOfRows; return pExchangeInfo->pResult; + + _error: + tfree(pMsg); + tfree(pMsgSendInfo); + + terrno = pTaskInfo->code; + return NULL; } +static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); + SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5038,21 +5050,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* assert(taosArrayGetSize(pInfo->pSources) > 0); size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = calloc(1, sizeof(SSDataBlock)); - pInfo->pResult->pDataBlock = taosArrayInit(pOperator->numOfOutput, sizeof(SColumnInfoData)); - - SArray* pResult = pInfo->pResult->pDataBlock; - for(int32_t i = 0; i < size; ++i) { - SColumnInfoData colInfoData = {0}; - SExprInfo* p = taosArrayGetP(pExprInfo, i); - - SSchema* pSchema = &p->base.resSchema; - colInfoData.info.type = pSchema->type; - colInfoData.info.colId = pSchema->colId; - colInfoData.info.bytes = pSchema->bytes; - - taosArrayPush(pResult, &colInfoData); - } + pInfo->pResult = createResultDataBlock(pExprInfo); pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; @@ -5064,13 +5062,13 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; - { + { // todo refactor SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processMsgFromServer; + rpcInit.cfp = processRspMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)"root"; @@ -5088,6 +5086,31 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* return pOperator; } +SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { + SSDataBlock* pResBlock = calloc(1, sizeof(SSDataBlock)); + if (pResBlock == NULL) { + return NULL; + } + + size_t numOfCols = taosArrayGetSize(pExprInfo); + pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + SArray* pResult = pResBlock->pDataBlock; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfoData = {0}; + SExprInfo* p = taosArrayGetP(pExprInfo, i); + + SSchema* pSchema = &p->base.resSchema; + colInfoData.info.type = pSchema->type; + colInfoData.info.colId = pSchema->colId; + colInfoData.info.bytes = pSchema->bytes; + + taosArrayPush(pResult, &colInfoData); + } + + return pResBlock; +} + SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0 && numOfOutput > 0); From 0399125bb829f2c5576fe064dc2a521104beef63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jan 2022 18:22:25 +0800 Subject: [PATCH 8/9] [td-11818] Refactor. --- source/client/test/clientTests.cpp | 100 ++++++++++----------- source/libs/planner/src/physicalPlanJson.c | 43 ++++----- source/libs/planner/src/planner.c | 6 +- 3 files changed, 70 insertions(+), 79 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 59b133338e..4361131f01 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -521,29 +521,29 @@ TEST(testCase, connect_Test) { // taosHashCleanup(phash); //} // -//TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == nullptr); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// char* sql = "select * from tu"; -// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -614,30 +614,30 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -TEST(testCase, projection_query_stables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "select ts from m1"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, projection_query_stables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select ts from m1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index bf38712b13..db1b1c84d1 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -29,6 +29,14 @@ static void copyString(const cJSON* json, const char* name, char* dst) { strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); } +static uint64_t getBigintFromString(const cJSON* json, const char* name) { + char* val = getString(json, name); + uint64_t intVal = strtoul(val, NULL, 10); + tfree(val); + + return intVal; +} + static int64_t getNumber(const cJSON* json, const char* name) { double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); return (int64_t) d; @@ -543,13 +551,13 @@ static const char* jkTimeWindowEndKey = "EndKey"; static bool timeWindowToJson(const void* obj, cJSON* json) { const STimeWindow* win = (const STimeWindow*)obj; - char tmp[32] = {0}; - sprintf(tmp, "%"PRId64, win->skey); + char tmp[40] = {0}; + snprintf(tmp, tListLen(tmp),"%"PRId64, win->skey); bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp); if (res) { memset(tmp, 0, tListLen(tmp)); - sprintf(tmp, "%"PRId64, win->ekey); + snprintf(tmp, tListLen(tmp),"%"PRId64, win->ekey); res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp); } return res; @@ -557,16 +565,8 @@ static bool timeWindowToJson(const void* obj, cJSON* json) { static bool timeWindowFromJson(const cJSON* json, void* obj) { STimeWindow* win = (STimeWindow*)obj; - - char* pStartKey = getString(json, jkTimeWindowStartKey); - win->skey = strtoul(pStartKey, NULL, 10); - - char* pEndKey = getString(json, jkTimeWindowEndKey); - win->ekey = strtoul(pEndKey, NULL, 10); - - tfree(pStartKey); - tfree(pEndKey); - + win->skey = getBigintFromString(json, jkTimeWindowStartKey); + win->ekey = getBigintFromString(json, jkTimeWindowEndKey); return true; } @@ -605,10 +605,7 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { static bool scanNodeFromJson(const cJSON* json, void* obj) { SScanPhyNode* pNode = (SScanPhyNode*)obj; - char* val = getString(json, jkScanNodeTableId); - pNode->uid = strtoull(val, NULL, 10); - tfree(val); - + pNode->uid = getBigintFromString(json, jkScanNodeTableId); pNode->tableType = getNumber(json, jkScanNodeTableType); pNode->count = getNumber(json, jkScanNodeTableCount); pNode->order = getNumber(json, jkScanNodeTableOrder); @@ -782,10 +779,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) { SDownstreamSource* pSource = (SDownstreamSource*)obj; pSource->taskId = getNumber(json, jkNodeTaskId); - char* pSchedId = getString(json, jkNodeTaskSchedId); - pSource->schedId = strtoul(pSchedId, NULL, 10); - tfree(pSchedId); - + pSource->schedId = getBigintFromString(json, jkNodeTaskSchedId); bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true); return res; } @@ -1031,12 +1025,9 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) { static bool subplanIdFromJson(const cJSON* json, void* obj) { SSubplanId* id = (SSubplanId*)obj; - char* queryId = getString(json, jkIdQueryId); - id->queryId = strtoul(queryId, NULL, 0); - tfree(queryId); - + id->queryId = getBigintFromString(json, jkIdQueryId); id->templateId = getNumber(json, jkIdTemplateId); - id->subplanId = getNumber(json, jkIdSubplanId); + id->subplanId = getNumber(json, jkIdSubplanId); return true; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 6b3f37741e..9b32213ad7 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { - char* str = NULL; - queryPlanToString(pLogicPlan, &str); - printf("%s\n", str); +// char* str = NULL; +// queryPlanToString(pLogicPlan, &str); +// printf("%s\n", str); } code = optimizeQueryPlan(pLogicPlan); From f04292b8d49995e6ce1c99378863b4132d270e92 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jan 2022 22:35:25 +0800 Subject: [PATCH 9/9] [td-11818]remove SParseBasicCtx --- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 28 ++++++++++++++++++++++---- source/client/src/clientMsgHandler.c | 2 +- source/libs/parser/src/dCDAstProcess.c | 9 +++++++++ source/util/src/tconfig.c | 2 +- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b6f38b12ca..1d10869e30 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -179,7 +179,7 @@ extern int32_t clientConnRefPool; extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); -SMsgSendInfo* buildMsgInfoImpl(SRequestObj*); +SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); int taos_init(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 10d1d09c85..75b652e40d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -195,8 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { } asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); } else { - SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); } tsem_wait(&pRequest->body.rspSem); @@ -706,6 +705,8 @@ void* doFetchRow(SRequestObj* pRequest) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + SEpSet epSet = {0}; + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pRequest->type == TDMT_VND_QUERY) { // All data has returned to App already, no need to try again @@ -729,6 +730,17 @@ void* doFetchRow(SRequestObj* pRequest) { pRequest->type = TDMT_MND_SHOW_RETRIEVE; } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; + SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + + epSet.numOfEps = pVgroupInfo->numOfEps; + epSet.inUse = pVgroupInfo->inUse; + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); + epSet.port[i] = pVgroupInfo->epAddr[i].port; + } + } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { pRequest->type = TDMT_VND_SHOW_TABLES; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; @@ -746,9 +758,17 @@ void* doFetchRow(SRequestObj* pRequest) { SMsgSendInfo* body = buildMsgInfoImpl(pRequest); + epSet.numOfEps = pVgroupInfo->numOfEps; + epSet.inUse = pVgroupInfo->inUse; + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); + epSet.port[i] = pVgroupInfo->epAddr[i].port; + } + int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; @@ -758,7 +778,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 497ef1ac95..02e36043dc 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -115,7 +115,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { } } else { assert(pRequest != NULL); - pMsgSendInfo->msgInfo = pRequest->body.requestMsg; + pMsgSendInfo->msgInfo = pRequest->body.requestMsg; } pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 955507ae1b..3b7b9e44d9 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -768,6 +768,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER) ? TDMT_MND_CREATE_USER : TDMT_MND_ALTER_USER; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -809,6 +810,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT) ? TDMT_MND_CREATE_ACCT : TDMT_MND_ALTER_ACCT; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -816,6 +818,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch case TSDB_SQL_DROP_USER: { pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT) ? TDMT_MND_DROP_ACCT : TDMT_MND_DROP_USER; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -852,6 +855,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)pUseDbMsg; pDcl->msgLen = sizeof(SUseDbReq); pDcl->msgType = TDMT_MND_USE_DB; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -880,6 +884,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch goto _error; } + pDcl->epSet = pCtx->mgmtEpSet; pDcl->pMsg = (char*)pCreateMsg; pDcl->msgLen = sizeof(SCreateDbReq); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB; @@ -908,6 +913,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgLen = sizeof(SDropDbReq); pDcl->pMsg = (char*)pDropDbMsg; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -920,6 +926,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = TDMT_MND_CREATE_STB; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -940,6 +947,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch } pDcl->msgType = TDMT_MND_CREATE_DNODE; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -950,6 +958,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch } pDcl->msgType = TDMT_MND_DROP_DNODE; + pDcl->epSet = pCtx->mgmtEpSet; break; } diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 726247d450..469da11d93 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tconfig.h" -#include "ulog.h" #include "tutil.h" +#include "ulog.h" SGlobalCfg tsGlobalConfig[TSDB_CFG_MAX_NUM] = {{0}}; int32_t tsGlobalConfigNum = 0;