diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index 51f267e884..df0f060fbe 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -257,7 +257,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return pRequest->code;
}
- return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
+ SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr));
+
+ SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 0};
+ addr.epAddr[0].port = 6030;
+ strcpy(addr.epAddr[0].fqdn, "ubuntu");
+
+ taosArrayPush(execNode, &addr);
+ return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
}
typedef struct tmq_t tmq_t;
@@ -706,9 +713,13 @@ void* doFetchRow(SRequestObj* pRequest) {
return NULL;
}
- scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
- setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
+ int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
+ if (code != TSDB_CODE_SUCCESS) {
+ pRequest->code = code;
+ return NULL;
+ }
+ setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
if (pResultInfo->numOfRows == 0) {
return NULL;
}
diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp
index 415d6a57ce..59b133338e 100644
--- a/source/client/test/clientTests.cpp
+++ b/source/client/test/clientTests.cpp
@@ -49,7 +49,7 @@ int main(int argc, char** argv) {
TEST(testCase, driverInit_Test) { taos_init(); }
TEST(testCase, connect_Test) {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
}
@@ -295,24 +295,24 @@ TEST(testCase, connect_Test) {
// taos_close(pConn);
//}
-TEST(testCase, create_table_Test) {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- assert(pConn != NULL);
-
- TAOS_RES* pRes = taos_query(pConn, "use abc1");
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
- ASSERT_EQ(taos_errno(pRes), 0);
-
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
- ASSERT_NE(taos_errno(pRes), 0);
-
- taos_free_result(pRes);
- taos_close(pConn);
-}
+//TEST(testCase, create_table_Test) {
+// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+// assert(pConn != NULL);
+//
+// TAOS_RES* pRes = taos_query(pConn, "use abc1");
+// taos_free_result(pRes);
+//
+// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
+// ASSERT_EQ(taos_errno(pRes), 0);
+//
+// taos_free_result(pRes);
+//
+// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
+// ASSERT_NE(taos_errno(pRes), 0);
+//
+// taos_free_result(pRes);
+// taos_close(pConn);
+//}
//TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) {
// taos_close(pConn);
//}
-TEST(testCase, show_stable_Test) {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- assert(pConn != nullptr);
-
-// TAOS_RES* pRes = taos_query(pConn, "use abc1");
+//TEST(testCase, show_stable_Test) {
+// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+// assert(pConn != nullptr);
+//
+//// TAOS_RES* pRes = taos_query(pConn, "use abc1");
+//// if (taos_errno(pRes) != 0) {
+//// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
+//// }
+//// taos_free_result(pRes);
+//
+// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
// if (taos_errno(pRes) != 0) {
-// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
+// printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
+// taos_free_result(pRes);
+// ASSERT_TRUE(false);
// }
+//
+// TAOS_ROW pRow = NULL;
+// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
+// int32_t numOfFields = taos_num_fields(pRes);
+//
+// char str[512] = {0};
+// while ((pRow = taos_fetch_row(pRes)) != NULL) {
+// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
+// printf("%s\n", str);
+// }
+//
// taos_free_result(pRes);
-
- TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
- if (taos_errno(pRes) != 0) {
- printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
- taos_free_result(pRes);
- ASSERT_TRUE(false);
- }
-
- TAOS_ROW pRow = NULL;
- TAOS_FIELD* pFields = taos_fetch_fields(pRes);
- int32_t numOfFields = taos_num_fields(pRes);
-
- char str[512] = {0};
- while ((pRow = taos_fetch_row(pRes)) != NULL) {
- int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
- printf("%s\n", str);
- }
-
- taos_free_result(pRes);
- taos_close(pConn);
-}
+// taos_close(pConn);
+//}
//
//TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@@ -521,29 +521,29 @@ TEST(testCase, show_stable_Test) {
// taosHashCleanup(phash);
//}
//
-TEST(testCase, create_topic_Test) {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- assert(pConn != NULL);
-
- TAOS_RES* pRes = taos_query(pConn, "use abc1");
- if (taos_errno(pRes) != 0) {
- printf("error in use db, reason:%s\n", taos_errstr(pRes));
- }
- taos_free_result(pRes);
-
- TAOS_FIELD* pFields = taos_fetch_fields(pRes);
- ASSERT_TRUE(pFields == nullptr);
-
- int32_t numOfFields = taos_num_fields(pRes);
- ASSERT_EQ(numOfFields, 0);
-
- taos_free_result(pRes);
-
- char* sql = "select * from tu";
- pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
- taos_free_result(pRes);
- taos_close(pConn);
-}
+//TEST(testCase, create_topic_Test) {
+// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+// assert(pConn != NULL);
+//
+// TAOS_RES* pRes = taos_query(pConn, "use abc1");
+// if (taos_errno(pRes) != 0) {
+// printf("error in use db, reason:%s\n", taos_errstr(pRes));
+// }
+// taos_free_result(pRes);
+//
+// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
+// ASSERT_TRUE(pFields == nullptr);
+//
+// int32_t numOfFields = taos_num_fields(pRes);
+// ASSERT_EQ(numOfFields, 0);
+//
+// taos_free_result(pRes);
+//
+// char* sql = "select * from tu";
+// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
+// taos_free_result(pRes);
+// taos_close(pConn);
+//}
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@@ -614,30 +614,30 @@ TEST(testCase, create_topic_Test) {
// taos_close(pConn);
//}
-//TEST(testCase, projection_query_stables) {
-// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
-// ASSERT_NE(pConn, nullptr);
-//
-// TAOS_RES* pRes = taos_query(pConn, "use abc1");
-// taos_free_result(pRes);
-//
-// pRes = taos_query(pConn, "select ts,k from m1");
-// if (taos_errno(pRes) != 0) {
-// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
-// taos_free_result(pRes);
-// ASSERT_TRUE(false);
-// }
-//
-// TAOS_ROW pRow = NULL;
-// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
-// int32_t numOfFields = taos_num_fields(pRes);
-//
-// char str[512] = {0};
-// while ((pRow = taos_fetch_row(pRes)) != NULL) {
-// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
-// printf("%s\n", str);
-// }
-//
-// taos_free_result(pRes);
-// taos_close(pConn);
-//}
+TEST(testCase, projection_query_stables) {
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+ ASSERT_NE(pConn, nullptr);
+
+ TAOS_RES* pRes = taos_query(pConn, "use abc1");
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "select ts from m1");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
+ taos_free_result(pRes);
+ ASSERT_TRUE(false);
+ }
+
+ TAOS_ROW pRow = NULL;
+ TAOS_FIELD* pFields = taos_fetch_fields(pRes);
+ int32_t numOfFields = taos_num_fields(pRes);
+
+ char str[512] = {0};
+ while ((pRow = taos_fetch_row(pRes)) != NULL) {
+ int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
+ printf("%s\n", str);
+ }
+
+ taos_free_result(pRes);
+ taos_close(pConn);
+}
diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h
index 75ddc37a03..383073871e 100644
--- a/source/dnode/vnode/inc/meta.h
+++ b/source/dnode/vnode/inc/meta.h
@@ -67,7 +67,7 @@ char * metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
-char * metaCtbCursorNext(SMCtbCursor *pCtbCur);
+tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h
index f3d9f71346..25e7c8cffa 100644
--- a/source/dnode/vnode/inc/tsdb.h
+++ b/source/dnode/vnode/inc/tsdb.h
@@ -92,6 +92,7 @@ int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT;
+
/**
* Get the data block iterator, starting from position according to the query condition
*
@@ -123,6 +124,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
+/**
+ *
+ * @param tsdb
+ * @param uid
+ * @param skey
+ * @param pTagCond
+ * @param len
+ * @param tagNameRelType
+ * @param tbnameCond
+ * @param pGroupInfo
+ * @param pColIndex
+ * @param numOfCols
+ * @param reqId
+ * @return
+ */
+int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
+ int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
+ SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId);
/**
* get num of rows in mem table
*
diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c
index 490333397f..bbceaa3389 100644
--- a/source/dnode/vnode/src/meta/metaBDBImpl.c
+++ b/source/dnode/vnode/src/meta/metaBDBImpl.c
@@ -658,7 +658,7 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
}
}
-char *metaCtbCursorNext(SMCtbCursor *pCtbCur) {
+tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
@@ -669,11 +669,13 @@ char *metaCtbCursorNext(SMCtbCursor *pCtbCur) {
skey.data = &(pCtbCur->suid);
skey.size = sizeof(pCtbCur->suid);
- if (pCtbCur->pCur->get(pCtbCur->pCur, &skey, &pval, DB_NEXT) == 0) {
- pBuf = pval.data;
- metaDecodeTbInfo(pBuf, &tbCfg);
- return tbCfg.name;
+ if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
+ tb_uid_t id = *(tb_uid_t *)pkey.data;
+ assert(id != 0);
+ return id;
+// metaDecodeTbInfo(pBuf, &tbCfg);
+// return tbCfg.;
} else {
- return NULL;
+ return 0;
}
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index d07a5ffc77..15748118d7 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -152,7 +152,7 @@ typedef struct STsdbReadHandle {
typedef struct STableGroupSupporter {
int32_t numOfCols;
SColIndex* pCols;
- STSchema* pTagSchema;
+ SSchema* pTagSchema;
} STableGroupSupporter;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
@@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return (tsdbReadHandleT)pReadHandle;
_end:
-// tsdbCleanupQueryHandle(pTsdbReadHandle);
+ tsdbCleanupQueryHandle(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
@@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return numOfRows;
}
-static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
- SSkipListIterator* iter = NULL;//tSkipListCreateIter(pSuperTable->pIndex);
- while (tSkipListIterNext(iter)) {
- SSkipListNode* pNode = tSkipListIterGet(iter);
+static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
+ SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
- STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode);
+ while (1) {
+ tb_uid_t id = metaCtbCursorNext(pCur);
+ if (id == 0) {
+ break;
+ }
- STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL};
+ STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id};
taosArrayPush(list, &info);
}
- tSkipListDestroyIter(iter);
+ metaCloseCtbCurosr(pCur);
return TSDB_CODE_SUCCESS;
}
@@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g);
}
-SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
+SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
@@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
- SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo));
+ SArray* sa = taosArrayDup(pTableList);
if (sa == NULL) {
taosArrayDestroy(pTableGroup);
return NULL;
}
- for(int32_t i = 0; i < size; ++i) {
- STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
-
- STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
- taosArrayPush(sa, &info);
- }
-
taosArrayPush(pTableGroup, &sa);
tsdbDebug("all %" PRIzu " tables belong to one group", size);
} else {
STableGroupSupporter sup = {0};
sup.numOfCols = numOfOrderCols;
- sup.pTagSchema = pTagSchema;
+ sup.pTagSchema = pTagSchema->pSchema;
sup.pCols = pCols;
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
@@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
//NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
- STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
+ SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
- assert(false);
- int32_t ret = 0;//getAllTableList(pTable, res);
+ int32_t ret = getAllTableList(tsdb->pMeta, uid, res);
if (ret != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro
return TSDB_CODE_SUCCESS;
}
-
+#endif
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) {
return NULL;
@@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
return NULL;
}
+
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) {
@@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
tfree(pTsdbReadHandle);
}
+#if 0
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index adb305ab09..255b000d10 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -649,6 +649,6 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
-int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
+int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index 1f5d0cd059..df28972645 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -13,10 +13,11 @@
* along with this program. If not, see .
*/
-#include "os.h"
-#include "tarray.h"
+#include
#include "dataSinkMgt.h"
#include "exception.h"
+#include "os.h"
+#include "tarray.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
@@ -72,7 +73,45 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
assert(tsdb != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
- int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
+ int32_t code = 0;
+ uint64_t uid = 0;
+ STimeWindow window = TSWINDOW_INITIALIZER;
+ int32_t tableType = 0;
+
+ SPhyNode *pPhyNode = pSubplan->pNode;
+ if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) {
+ STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode;
+ uid = pTableScanNode->scan.uid;
+ window = pTableScanNode->window;
+ tableType = pTableScanNode->scan.tableType;
+ } else {
+ assert(0);
+ }
+
+ STableGroupInfo groupInfo = {0};
+ if (tableType == TSDB_SUPER_TABLE) {
+ code = tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId);
+ if (code != TSDB_CODE_SUCCESS) {
+ goto _error;
+ }
+ } else { // Create one table group.
+ groupInfo.numOfTables = 1;
+ groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES);
+
+ SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
+
+ STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
+ taosArrayPush(pa, &info);
+ taosArrayPush(groupInfo.pGroupList, &pa);
+ }
+
+ if (groupInfo.numOfTables == 0) {
+ code = 0;
+// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
+ goto _error;
+ }
+
+ code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -141,6 +180,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
+ // todo: remove it.
+ if (tinfo == NULL) {
+ return NULL;
+ }
+
*pRes = NULL;
int64_t curOwner = 0;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index b6df0c527e..9daf3298e4 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -7369,15 +7369,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
}
}
-int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
+int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle) {
STsdbQueryCond cond = {.loadExternalRows = false};
- uint64_t uid = 0;
SPhyNode* pPhyNode = pPlan->pNode;
if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) {
STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode;
- uid = pTableScanNode->scan.uid;
cond.order = pTableScanNode->scan.order;
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
@@ -7397,15 +7395,8 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
assert(0);
}
- STableGroupInfo group = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES)};
- SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
- STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
- taosArrayPush(pa, &info);
-
- taosArrayPush(group.pGroupList, &pa);
-
*pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId);
- tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL);
+ tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL);
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle);
if ((*pTaskInfo)->pRoot == NULL) {
diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c
index c7a4e438ba..9aeae55c4f 100644
--- a/source/libs/planner/src/physicalPlanJson.c
+++ b/source/libs/planner/src/physicalPlanJson.c
@@ -558,11 +558,15 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
static bool timeWindowFromJson(const cJSON* json, void* obj) {
STimeWindow* win = (STimeWindow*)obj;
- char* p = getString(json, jkTimeWindowStartKey);
- win->skey = strtoll(p, NULL, 10);
+ char* pStartKey = getString(json, jkTimeWindowStartKey);
+ win->skey = strtoll(pStartKey, NULL, 10);
+
+ char* pEndKey = getString(json, jkTimeWindowEndKey);
+ win->ekey = strtoll(pEndKey, NULL, 10);
+
+ tfree(pStartKey);
+ tfree(pEndKey);
- p = getString(json, jkTimeWindowEndKey);
- win->ekey = strtoll(p, NULL, 10);
return true;
}
@@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse";
static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
- bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid);
+
+ char uid[40] = {0};
+ snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid);
+ bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid);
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
}
+
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
}
+
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
}
@@ -589,12 +598,17 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
}
+
return res;
}
static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* pNode = (SScanPhyNode*)obj;
- pNode->uid = getNumber(json, jkScanNodeTableId);
+
+ char* val = getString(json, jkScanNodeTableId);
+ pNode->uid = strtoull(val, NULL, 10);
+ tfree(val);
+
pNode->tableType = getNumber(json, jkScanNodeTableType);
pNode->count = getNumber(json, jkScanNodeTableCount);
pNode->order = getNumber(json, jkScanNodeTableOrder);
@@ -726,7 +740,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
}
if (res) {
- res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr));
+ res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, sizeof(SEpAddr), ep->numOfEps);
}
return res;
}
diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt
index a3db9c6992..9ada451c61 100644
--- a/source/libs/qworker/CMakeLists.txt
+++ b/source/libs/qworker/CMakeLists.txt
@@ -1,15 +1,4 @@
aux_source_directory(src QWORKER_SRC)
-#add_library(qworker ${QWORKER_SRC})
-#target_include_directories(
-# qworker
-# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
-# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
-#)
-#
-#target_link_libraries(
-# qworker
-# PRIVATE os util transport planner qcom executor
-#)
add_library(qworker STATIC ${QWORKER_SRC})
target_include_directories(
@@ -18,11 +7,6 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
-#set_target_properties(qworker PROPERTIES
-# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a"
-# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker"
-# )
-
target_link_libraries(qworker
PRIVATE os util transport planner qcom executor
)
diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h
index 661beee5d5..6b047eb96e 100644
--- a/source/libs/scheduler/inc/schedulerInt.h
+++ b/source/libs/scheduler/inc/schedulerInt.h
@@ -29,7 +29,7 @@ extern "C" {
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
-#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
+#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
SCH_READ = 1,
diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c
index c53926f8c1..ffb602bd36 100644
--- a/source/libs/scheduler/src/scheduler.c
+++ b/source/libs/scheduler/src/scheduler.c
@@ -26,9 +26,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
pTask->level = pLevel;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
- pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
+ pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->execAddrs) {
- SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
+ SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -383,9 +383,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
pTask->candidateIdx = 0;
- pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
+ pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->candidateAddrs) {
- SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
+ SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -405,10 +405,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (pJob->nodeList) {
nodeNum = taosArrayGetSize(pJob->nodeList);
- for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
+ for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
- if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
+ if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -418,12 +418,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
if (addNum <= 0) {
- SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);
+ SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum);
return TSDB_CODE_QRY_INVALID_INPUT;
}
/*
- for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
+ for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
@@ -734,7 +734,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
/*
- if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
+ if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
@@ -1165,6 +1165,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code);
}
+
+ printf("physical plan:%s\n", pTask->msg);
}
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));