diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index fc9a51af65..01172ae9c9 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,3 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} # [Optional] Uncomment this section to install additional packages. # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends +RUN apt-get update && apt-get -y install tree vim diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index f069b68286..ba3539e64e 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -114,10 +114,16 @@ typedef struct SProjectPhyNode { SPhyNode node; } SProjectPhyNode; +typedef struct SDownstreamSource { + SQueryNodeAddr addr; + uint64_t taskId; + uint64_t schedId; +} SDownstreamSource; + typedef struct SExchangePhyNode { SPhyNode node; - uint64_t srcTemplateId; // template id of datasource suplans - SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode + uint64_t srcTemplateId; // template id of datasource suplans + SArray *pSrcEndPoints; // SArray, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; typedef enum EAggAlgo { @@ -178,7 +184,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b6f38b12ca..1d10869e30 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -179,7 +179,7 @@ extern int32_t clientConnRefPool; extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); -SMsgSendInfo* buildMsgInfoImpl(SRequestObj*); +SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); int taos_init(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 51f267e884..75b652e40d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -195,8 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { } asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); } else { - SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); } tsem_wait(&pRequest->body.rspSem); @@ -257,7 +256,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { return pRequest->code; } - return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); + SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); + + SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2}; + addr.epAddr[0].port = 6030; + strcpy(addr.epAddr[0].fqdn, "localhost"); + + taosArrayPush(execNode, &addr); + return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); } typedef struct tmq_t tmq_t; @@ -699,6 +705,8 @@ void* doFetchRow(SRequestObj* pRequest) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + SEpSet epSet = {0}; + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pRequest->type == TDMT_VND_QUERY) { // All data has returned to App already, no need to try again @@ -706,9 +714,13 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); - setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); + int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + return NULL; + } + setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); if (pResultInfo->numOfRows == 0) { return NULL; } @@ -718,6 +730,17 @@ void* doFetchRow(SRequestObj* pRequest) { pRequest->type = TDMT_MND_SHOW_RETRIEVE; } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; + SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; + SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); + + epSet.numOfEps = pVgroupInfo->numOfEps; + epSet.inUse = pVgroupInfo->inUse; + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); + epSet.port[i] = pVgroupInfo->epAddr[i].port; + } + } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { pRequest->type = TDMT_VND_SHOW_TABLES; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; @@ -735,9 +758,17 @@ void* doFetchRow(SRequestObj* pRequest) { SMsgSendInfo* body = buildMsgInfoImpl(pRequest); + epSet.numOfEps = pVgroupInfo->numOfEps; + epSet.inUse = pVgroupInfo->inUse; + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i])); + epSet.port[i] = pVgroupInfo->epAddr[i].port; + } + int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; @@ -747,7 +778,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 497ef1ac95..02e36043dc 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -115,7 +115,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { } } else { assert(pRequest != NULL); - pMsgSendInfo->msgInfo = pRequest->body.requestMsg; + pMsgSendInfo->msgInfo = pRequest->body.requestMsg; } pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..4361131f01 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,7 +49,7 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, connect_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } @@ -295,24 +295,24 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -TEST(testCase, create_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); - ASSERT_EQ(taos_errno(pRes), 0); - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); - ASSERT_NE(taos_errno(pRes), 0); - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, create_table_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); +// ASSERT_EQ(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); +// ASSERT_NE(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// taos_close(pConn); +//} //TEST(testCase, create_ctable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) { // taos_close(pConn); //} -TEST(testCase, show_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); - -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +//TEST(testCase, show_stable_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != nullptr); +// +//// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +// +// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); // if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); // } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// // taos_free_result(pRes); - - TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); - if (taos_errno(pRes) != 0) { - printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} +// taos_close(pConn); +//} // //TEST(testCase, show_vgroup_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -621,7 +621,7 @@ TEST(testCase, create_topic_Test) { // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "select ts,k from m1"); +// pRes = taos_query(pConn, "select ts from m1"); // if (taos_errno(pRes) != 0) { // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 031ed178f0..383073871e 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -42,7 +42,8 @@ typedef struct { SSchema *pSchema; } SSchemaWrapper; -typedef struct SMTbCursor SMTbCursor; +typedef struct SMTbCursor SMTbCursor; +typedef struct SMCtbCursor SMCtbCursor; typedef SVCreateTbReq STbCfg; @@ -64,6 +65,10 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); char * metaTbCursorNext(SMTbCursor *pTbCur); +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); + // Options void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsClear(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index cfd8e501f6..362ad9b2d8 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -93,6 +93,7 @@ int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); typedef void* tsdbReadHandleT; + /** * Get the data block iterator, starting from position according to the query condition * @@ -124,6 +125,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle); +/** + * + * @param tsdb + * @param uid + * @param skey + * @param pTagCond + * @param len + * @param tagNameRelType + * @param tbnameCond + * @param pGroupInfo + * @param pColIndex + * @param numOfCols + * @param reqId + * @return + */ +int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, + int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, + SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId); /** * get num of rows in mem table * diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index ae13d9fddc..d8ee48ff23 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -374,22 +374,27 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey DBT * pDbt; if (pTbCfg->type == META_CHILD_TABLE) { - pDbt = calloc(2, sizeof(DBT)); + // pDbt = calloc(2, sizeof(DBT)); - // First key is suid - pDbt[0].data = &(pTbCfg->ctbCfg.suid); - pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); + // // First key is suid + // pDbt[0].data = &(pTbCfg->ctbCfg.suid); + // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); - // Second key is the first tag - void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); - pDbt[1].data = pTagVal; - pDbt[1].size = sizeof(int32_t); + // // Second key is the first tag + // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); + // pDbt[1].data = pTagVal; + // pDbt[1].size = sizeof(int32_t); // Set index key memset(pSKey, 0, sizeof(*pSKey)); +#if 0 pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; pSKey->data = pDbt; pSKey->size = 2; +#else + pSKey->data = &(pTbCfg->ctbCfg.suid); + pSKey->size = sizeof(pTbCfg->ctbCfg.suid); +#endif return 0; } else { @@ -621,4 +626,61 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { tdDestroyTSchemaBuilder(&sb); return pTSchema; +} + +struct SMCtbCursor { + DBC * pCur; + tb_uid_t suid; +}; + +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { + SMCtbCursor *pCtbCur = NULL; + SMetaDB * pDB = pMeta->pDB; + int ret; + + pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur)); + if (pCtbCur == NULL) { + return NULL; + } + + pCtbCur->suid = uid; + ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0); + if (ret != 0) { + free(pCtbCur); + return NULL; + } + + return pCtbCur; +} + +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { + if (pCtbCur) { + if (pCtbCur->pCur) { + pCtbCur->pCur->close(pCtbCur->pCur); + } + + free(pCtbCur); + } +} + +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; + void * pBuf; + STbCfg tbCfg; + + // Set key + skey.data = &(pCtbCur->suid); + skey.size = sizeof(pCtbCur->suid); + + if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { + tb_uid_t id = *(tb_uid_t *)pkey.data; + assert(id != 0); + return id; + // metaDecodeTbInfo(pBuf, &tbCfg); + // return tbCfg.; + } else { + return 0; + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d07a5ffc77..15748118d7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -152,7 +152,7 @@ typedef struct STsdbReadHandle { typedef struct STableGroupSupporter { int32_t numOfCols; SColIndex* pCols; - STSchema* pTagSchema; + SSchema* pTagSchema; } STableGroupSupporter; static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); @@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, return (tsdbReadHandleT)pReadHandle; _end: -// tsdbCleanupQueryHandle(pTsdbReadHandle); + tsdbCleanupQueryHandle(pReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int return numOfRows; } -static int32_t getAllTableList(STable* pSuperTable, SArray* list) { - SSkipListIterator* iter = NULL;//tSkipListCreateIter(pSuperTable->pIndex); - while (tSkipListIterNext(iter)) { - SSkipListNode* pNode = tSkipListIterGet(iter); +static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { + SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); - STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode); + while (1) { + tb_uid_t id = metaCtbCursorNext(pCur); + if (id == 0) { + break; + } - STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL}; + STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id}; taosArrayPush(list, &info); } - tSkipListDestroyIter(iter); + metaCloseCtbCurosr(pCur); return TSDB_CODE_SUCCESS; } @@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable taosArrayPush(pGroups, &g); } -SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { +SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { assert(pTableList != NULL); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); @@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC } if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table - SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo)); + SArray* sa = taosArrayDup(pTableList); if (sa == NULL) { taosArrayDestroy(pTableGroup); return NULL; } - for(int32_t i = 0; i < size; ++i) { - STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); - - STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; - taosArrayPush(sa, &info); - } - taosArrayPush(pTableGroup, &sa); tsdbDebug("all %" PRIzu " tables belong to one group", size); } else { STableGroupSupporter sup = {0}; sup.numOfCols = numOfOrderCols; - sup.pTagSchema = pTagSchema; + sup.pTagSchema = pTagSchema->pSchema; sup.pCols = pCols; // taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn); @@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch //NOTE: not add ref count for super table SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); + SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { - assert(false); - int32_t ret = 0;//getAllTableList(pTable, res); + int32_t ret = getAllTableList(tsdb->pMeta, uid, res); if (ret != TSDB_CODE_SUCCESS) { goto _error; } @@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro return TSDB_CODE_SUCCESS; } - +#endif static void* doFreeColumnInfoData(SArray* pColumnInfoData) { if (pColumnInfoData == NULL) { return NULL; @@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { return NULL; } + void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; if (pTsdbReadHandle == NULL) { @@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { tfree(pTsdbReadHandle); } +#if 0 void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { assert(pGroupList != NULL); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index adb305ab09..beb26572d0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -375,9 +375,12 @@ typedef struct STaskParam { } STaskParam; typedef struct SExchangeInfo { - int32_t numOfSources; - SEpSet *pEpset; - int32_t bytes; // total load bytes from remote + SArray *pSources; + int32_t bytes; // total load bytes from remote + tsem_t ready; + void *pTransporter; + SRetrieveTableRsp *pRsp; + SSDataBlock *pResult; } SExchangeInfo; typedef struct STableScanInfo { @@ -545,7 +548,7 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); -SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); @@ -649,6 +652,6 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); +int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle); #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 1f5d0cd059..56e2977753 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tarray.h" +#include #include "dataSinkMgt.h" #include "exception.h" +#include "os.h" +#include "tarray.h" #include "tcache.h" #include "tglobal.h" #include "tmsg.h" @@ -72,7 +73,46 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); + int32_t code = 0; + uint64_t uid = 0; + STimeWindow window = TSWINDOW_INITIALIZER; + int32_t tableType = 0; + + SPhyNode* pPhyNode = pSubplan->pNode; + STableGroupInfo groupInfo = {0}; + + int32_t type = pPhyNode->info.type; + if (type == OP_TableScan || type == OP_DataBlocksOptScan) { + STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode; + uid = pTableScanNode->scan.uid; + window = pTableScanNode->window; + tableType = pTableScanNode->scan.tableType; + + if (tableType == TSDB_SUPER_TABLE) { + code = + tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } else { // Create one table group. + groupInfo.numOfTables = 1; + groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + + SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); + + STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; + taosArrayPush(pa, &info); + taosArrayPush(groupInfo.pGroupList, &pa); + } + + if (groupInfo.numOfTables == 0) { + code = 0; + // qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId); + goto _error; + } + } + + code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -87,7 +127,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ *handle = (*pTask)->dsHandle; - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } @@ -141,6 +181,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + // todo: remove it. + if (tinfo == NULL) { + return TSDB_CODE_SUCCESS; + } + *pRes = NULL; int64_t curOwner = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b6df0c527e..27c24669cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4558,6 +4558,7 @@ void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); +void createResultBlock(const SArray* pExprInfo, SExchangeInfo* pInfo, const SOperatorInfo* pOperator, size_t size); static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0 @@ -4914,57 +4915,127 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { } int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SExchangeInfo* pEx = (SExchangeInfo*) param; + pEx->pRsp = pMsg->pData; + pEx->pRsp->numOfRows = htonl(pEx->pRsp->numOfRows); + pEx->pRsp->useconds = htobe64(pEx->pRsp->useconds); + pEx->pRsp->compLen = htonl(pEx->pRsp->compLen); + + tsem_post(&pEx->ready); +} + +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { + assert(pMsgBody != NULL); + tfree(pMsgBody->msgInfo.pData); + tfree(pMsgBody); +} + +void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { + SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + assert(pMsg->ahandle != NULL); + + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; + + if (pMsg->contLen > 0) { + buf.pData = calloc(1, pMsg->contLen); + if (buf.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(buf.pData, pMsg->pCont, pMsg->contLen); + } + } + + pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); + rpcFreeCont(pMsg->pCont); + destroySendMsgInfo(pSendInfo); } static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo *pOperator = (SOperatorInfo*) param; SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; *newgroup = false; + if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { + return NULL; + } SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); if (NULL == pMsg) { // todo handle malloc error - + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } - SEpSet epSet; + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); + SEpSet epSet = {0}; - int64_t sId = -1, queryId = 0, taskId = 1, vgId = 1; - pMsg->header.vgId = htonl(vgId); + epSet.numOfEps = pSource->addr.numOfEps; + epSet.port[0] = pSource->addr.epAddr[0].port; + tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); - pMsg->sId = htobe64(sId); - pMsg->taskId = htobe64(taskId); - pMsg->queryId = htobe64(queryId); + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", queryId, taskId, (int32_t)sizeof(SMsgSendInfo)); + qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; } - pMsgSendInfo->param = NULL; + pMsgSendInfo->param = pExchangeInfo; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgType = TDMT_VND_FETCH; pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - void* pTransporter = NULL; - int32_t code = asyncSendMsgToServer(pTransporter, &epSet, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + tsem_wait(&pExchangeInfo->ready); - printf("abc\n"); - getchar(); + if (pExchangeInfo->pRsp->numOfRows == 0) { + return NULL; + } - // add it into the sink node + SSDataBlock* pRes = pExchangeInfo->pResult; + char* pData = pExchangeInfo->pRsp->data; + for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); + if (tmp == NULL) { + goto _error; + } + + size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; + memcpy(tmp, pData, len); + + pColInfoData->pData = tmp; + pData += len; + } + + pRes->info.numOfCols = pOperator->numOfOutput; + pRes->info.rows = pExchangeInfo->pRsp->numOfRows; + + return pExchangeInfo->pResult; + + _error: + tfree(pMsg); + tfree(pMsgSendInfo); + + terrno = pTaskInfo->code; + return NULL; } -SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { - assert(numOfSources > 0); +static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -4975,21 +5046,71 @@ SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t n return NULL; } - pInfo->numOfSources = numOfSources; + pInfo->pSources = taosArrayDup(pSources); + assert(taosArrayGetSize(pInfo->pSources) > 0); + + size_t size = taosArrayGetSize(pExprInfo); + pInfo->pResult = createResultDataBlock(pExprInfo); pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfOutput = size; pOperator->pRuntimeEnv = NULL; pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; + { // todo refactor + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "TSC"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processRspMsg; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; +// rpcInit.spi = 1; + rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; + + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } + } + return pOperator; } +SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { + SSDataBlock* pResBlock = calloc(1, sizeof(SSDataBlock)); + if (pResBlock == NULL) { + return NULL; + } + + size_t numOfCols = taosArrayGetSize(pExprInfo); + pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + SArray* pResult = pResBlock->pDataBlock; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfoData = {0}; + SExprInfo* p = taosArrayGetP(pExprInfo, i); + + SSchema* pSchema = &p->base.resSchema; + colInfoData.info.type = pSchema->type; + colInfoData.info.colId = pSchema->colId; + colInfoData.info.bytes = pSchema->bytes; + + taosArrayPush(pResult, &colInfoData); + } + + return pResBlock; +} + SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0 && numOfOutput > 0); @@ -5016,7 +5137,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = NULL; pOperator->exec = doTableScan; pOperator->pTaskInfo = pTaskInfo; @@ -5049,7 +5169,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = NULL; pOperator->exec = doTableScan; pOperator->pTaskInfo = pTaskInfo; @@ -7363,50 +7482,47 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); return createDataBlocksOptScanInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + } else if (pPhyNode->info.type == OP_Exchange) { + SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; + return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else { assert(0); } } } -int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { +int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle) { STsdbQueryCond cond = {.loadExternalRows = false}; - uint64_t uid = 0; + tsdbReadHandleT tsdbReadHandle = NULL; + SPhyNode* pPhyNode = pPlan->pNode; if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { - - STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode; - uid = pTableScanNode->scan.uid; - cond.order = pTableScanNode->scan.order; + STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode; + cond.order = pTableScanNode->scan.order; cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets); - cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); - cond.twindow = pTableScanNode->window; - cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + cond.twindow = pTableScanNode->window; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; - for(int32_t i = 0; i < cond.numOfCols; ++i) { + for (int32_t i = 0; i < cond.numOfCols; ++i) { SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i); assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE); SSchema* pSchema = pExprInfo->pExpr->pSchema; - cond.colList[i].type = pSchema->type; + cond.colList[i].type = pSchema->type; cond.colList[i].bytes = pSchema->bytes; cond.colList[i].colId = pSchema->colId; } + + *pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId); + tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL); + } else if (pPhyNode->info.type == OP_Exchange) { + *pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId); } else { assert(0); } - STableGroupInfo group = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES)}; - SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); - STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; - taosArrayPush(pa, &info); - - taosArrayPush(group.pGroupList, &pa); - - *pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId); - tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL); - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); if ((*pTaskInfo)->pRoot == NULL) { return terrno; diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 955507ae1b..3b7b9e44d9 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -768,6 +768,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER) ? TDMT_MND_CREATE_USER : TDMT_MND_ALTER_USER; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -809,6 +810,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT) ? TDMT_MND_CREATE_ACCT : TDMT_MND_ALTER_ACCT; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -816,6 +818,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch case TSDB_SQL_DROP_USER: { pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen); pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT) ? TDMT_MND_DROP_ACCT : TDMT_MND_DROP_USER; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -852,6 +855,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)pUseDbMsg; pDcl->msgLen = sizeof(SUseDbReq); pDcl->msgType = TDMT_MND_USE_DB; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -880,6 +884,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch goto _error; } + pDcl->epSet = pCtx->mgmtEpSet; pDcl->pMsg = (char*)pCreateMsg; pDcl->msgLen = sizeof(SCreateDbReq); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB; @@ -908,6 +913,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgLen = sizeof(SDropDbReq); pDcl->pMsg = (char*)pDropDbMsg; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -920,6 +926,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = TDMT_MND_CREATE_STB; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -940,6 +947,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch } pDcl->msgType = TDMT_MND_CREATE_DNODE; + pDcl->epSet = pCtx->mgmtEpSet; break; } @@ -950,6 +958,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch } pDcl->msgType = TDMT_MND_DROP_DNODE; + pDcl->epSet = pCtx->mgmtEpSet; break; } diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index be266bd415..4ff8364198 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); -void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 93f72bba95..ae058c1f85 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ -#include "function.h" #include "os.h" #include "parser.h" +#include "function.h" #include "plannerInt.h" typedef struct SFillEssInfo { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 422233eed7..1e1a97df1f 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -278,7 +278,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) { SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode)); node->srcTemplateId = srcTemplateId; - node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr))); + node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource))); return (SPhyNode*)node; } @@ -409,24 +409,25 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) { +void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) { if (NULL == pNode) { return; } if (OP_Exchange == pNode->info.type) { SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode; if (templateId == pExchange->srcTemplateId) { - taosArrayPush(pExchange->pSrcEndPoints, pEp); + taosArrayPush(pExchange->pSrcEndPoints, pSource); } } + if (pNode->pChildren != NULL) { size_t size = taosArrayGetSize(pNode->pChildren); for(int32_t i = 0; i < size; ++i) { - setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i)); + setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i)); } } } -void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) { - setExchangSourceNode(templateId, pEp, subplan->pNode); +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { + setExchangSourceNode(templateId, pSource, subplan->pNode); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index c7a4e438ba..db1b1c84d1 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -29,6 +29,14 @@ static void copyString(const cJSON* json, const char* name, char* dst) { strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); } +static uint64_t getBigintFromString(const cJSON* json, const char* name) { + char* val = getString(json, name); + uint64_t intVal = strtoul(val, NULL, 10); + tfree(val); + + return intVal; +} + static int64_t getNumber(const cJSON* json, const char* name) { double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); return (int64_t) d; @@ -543,13 +551,13 @@ static const char* jkTimeWindowEndKey = "EndKey"; static bool timeWindowToJson(const void* obj, cJSON* json) { const STimeWindow* win = (const STimeWindow*)obj; - char tmp[32] = {0}; - sprintf(tmp, "%"PRId64, win->skey); + char tmp[40] = {0}; + snprintf(tmp, tListLen(tmp),"%"PRId64, win->skey); bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp); if (res) { memset(tmp, 0, tListLen(tmp)); - sprintf(tmp, "%"PRId64, win->ekey); + snprintf(tmp, tListLen(tmp),"%"PRId64, win->ekey); res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp); } return res; @@ -557,12 +565,8 @@ static bool timeWindowToJson(const void* obj, cJSON* json) { static bool timeWindowFromJson(const cJSON* json, void* obj) { STimeWindow* win = (STimeWindow*)obj; - - char* p = getString(json, jkTimeWindowStartKey); - win->skey = strtoll(p, NULL, 10); - - p = getString(json, jkTimeWindowEndKey); - win->ekey = strtoll(p, NULL, 10); + win->skey = getBigintFromString(json, jkTimeWindowStartKey); + win->ekey = getBigintFromString(json, jkTimeWindowEndKey); return true; } @@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse"; static bool scanNodeToJson(const void* obj, cJSON* json) { const SScanPhyNode* pNode = (const SScanPhyNode*)obj; - bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid); + + char uid[40] = {0}; + snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid); + bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid); if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType); } + if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order); } + if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count); } @@ -589,12 +598,14 @@ static bool scanNodeToJson(const void* obj, cJSON* json) { if (res) { res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse); } + return res; } static bool scanNodeFromJson(const cJSON* json, void* obj) { SScanPhyNode* pNode = (SScanPhyNode*)obj; - pNode->uid = getNumber(json, jkScanNodeTableId); + + pNode->uid = getBigintFromString(json, jkScanNodeTableId); pNode->tableType = getNumber(json, jkScanNodeTableType); pNode->count = getNumber(json, jkScanNodeTableCount); pNode->order = getNumber(json, jkScanNodeTableOrder); @@ -715,40 +726,72 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { return true; } -static const char* jkNodeAddrId = "NodeId"; -static const char* jkNodeAddrInUse = "InUse"; -static const char* jkNodeAddrEpAddrs = "EpAddrs"; +static const char* jkNodeAddrId = "NodeId"; +static const char* jkNodeAddrInUse = "InUse"; +static const char* jkNodeAddrEpAddrs = "Ep"; +static const char* jkNodeAddr = "NodeAddr"; +static const char* jkNodeTaskId = "TaskId"; +static const char* jkNodeTaskSchedId = "SchedId"; + +static bool queryNodeAddrToJson(const void* obj, cJSON* json) { + const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*) obj; + bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId); + + if (res) { + res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse); + } + + if (res) { + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps); + } + return res; +} + +static bool queryNodeAddrFromJson(const cJSON* json, void* obj) { + SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj; + + pAddr->nodeId = getNumber(json, jkNodeAddrId); + pAddr->inUse = getNumber(json, jkNodeAddrInUse); + + int32_t numOfEps = 0; + bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps); + pAddr->numOfEps = numOfEps; + return res; +} static bool nodeAddrToJson(const void* obj, cJSON* json) { - const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj; - bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId); + const SDownstreamSource* pSource = (const SDownstreamSource*) obj; + bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId); + if (res) { - res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); + char t[30] = {0}; + snprintf(t, tListLen(t), "%"PRIu64, pSource->schedId); + res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t); } + if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr)); + res = addObject(json, jkNodeAddr, queryNodeAddrToJson, &pSource->addr); } return res; } static bool nodeAddrFromJson(const cJSON* json, void* obj) { - SQueryNodeAddr* ep = (SQueryNodeAddr*)obj; - ep->nodeId = getNumber(json, jkNodeAddrId); - ep->inUse = getNumber(json, jkNodeAddrInUse); - int32_t numOfEps = 0; - bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps); - ep->numOfEps = numOfEps; + SDownstreamSource* pSource = (SDownstreamSource*)obj; + pSource->taskId = getNumber(json, jkNodeTaskId); + + pSource->schedId = getBigintFromString(json, jkNodeTaskSchedId); + bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true); return res; } static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; -static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; +static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs"; static bool exchangeNodeToJson(const void* obj, cJSON* json) { const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); if (res) { - res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints); + res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData, sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints)); } return res; } @@ -756,7 +799,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) { SExchangePhyNode* exchange = (SExchangePhyNode*)obj; exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); - return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr)); + return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SDownstreamSource)); } static bool specificPhyNodeToJson(const void* obj, cJSON* json) { @@ -965,7 +1008,11 @@ static const char* jkIdSubplanId = "SubplanId"; static bool subplanIdToJson(const void* obj, cJSON* jId) { const SSubplanId* id = (const SSubplanId*)obj; - bool res = cJSON_AddNumberToObject(jId, jkIdQueryId, id->queryId); + + char ids[40] = {0}; + snprintf(ids, tListLen(ids), "%"PRIu64, id->queryId); + + bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids); if (res) { res = cJSON_AddNumberToObject(jId, jkIdTemplateId, id->templateId); } @@ -977,9 +1024,10 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) { static bool subplanIdFromJson(const cJSON* json, void* obj) { SSubplanId* id = (SSubplanId*)obj; - id->queryId = getNumber(json, jkIdQueryId); + + id->queryId = getBigintFromString(json, jkIdQueryId); id->templateId = getNumber(json, jkIdTemplateId); - id->subplanId = getNumber(json, jkIdSubplanId); + id->subplanId = getNumber(json, jkIdSubplanId); return true; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index bf815b26b2..9b32213ad7 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { - char* str = NULL; - queryPlanToString(pLogicPlan, &str); - printf("%s\n", str); +// char* str = NULL; +// queryPlanToString(pLogicPlan, &str); +// printf("%s\n", str); } code = optimizeQueryPlan(pLogicPlan); @@ -87,8 +87,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return TSDB_CODE_SUCCESS; } -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { - setSubplanExecutionNode(subplan, templateId, ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { + setSubplanExecutionNode(subplan, templateId, pSource); } int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) { diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index a3db9c6992..9ada451c61 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -1,15 +1,4 @@ aux_source_directory(src QWORKER_SRC) -#add_library(qworker ${QWORKER_SRC}) -#target_include_directories( -# qworker -# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker" -# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -#) -# -#target_link_libraries( -# qworker -# PRIVATE os util transport planner qcom executor -#) add_library(qworker STATIC ${QWORKER_SRC}) target_include_directories( @@ -18,11 +7,6 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -#set_target_properties(qworker PROPERTIES -# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a" -# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker" -# ) - target_link_libraries(qworker PRIVATE os util transport planner qcom executor ) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 57bb73c500..50a6158631 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -271,6 +271,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); + printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId); + SQWTaskCtx nctx = {0}; QW_LOCK(QW_WRITE, &mgmt->ctxLock); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 661beee5d5..6b047eb96e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,7 +29,7 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 -#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA +#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA enum { SCH_READ = 1, diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index de295f77c6..45c6936b62 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -52,9 +52,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel * pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); - pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->execAddrs) { - SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -66,7 +66,7 @@ void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } - tfree(pTask->msg); + tfree(pTask->msg); if (pTask->children) { taosArrayDestroy(pTask->children); @@ -408,9 +408,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } pTask->candidateIdx = 0; - pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->candidateAddrs) { - SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -430,10 +430,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -443,12 +443,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } if (addNum <= 0) { - SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); + SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum); return TSDB_CODE_QRY_INVALID_INPUT; } /* - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; @@ -767,7 +767,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } /* - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; @@ -782,7 +782,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); - qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr); + SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; + qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(par)) { @@ -853,7 +854,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SResReadyRsp *rsp = (SResReadyRsp *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -879,7 +880,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); } - + SCH_ERR_JRET(schProcessOnDataFetched(pJob)); break; @@ -1214,6 +1215,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); SCH_ERR_JRET(code); } + + printf("physical plan:%s\n", pTask->msg); } SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); @@ -1477,7 +1480,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSubQueryMsg *pMsg = msg; + SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; pMsg->header.vgId = htonl(tInfo.addr.nodeId); @@ -1512,7 +1515,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { } int32_t code = 0; - + *dst = taosArrayInit(copyNum, sizeof(STaskInfo)); if (NULL == *dst) { qError("taosArrayInit %d taskInfo failed", copyNum); @@ -1523,7 +1526,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { STaskInfo info = {0}; info.addr = src->addr; - + for (int32_t i = 0; i < copyNum; ++i) { info.msg = malloc(msgSize); if (NULL == info.msg) { @@ -1534,7 +1537,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { memcpy(info.msg, src->msg, msgSize); info.msg->taskId = schGenUUID(); - + if (NULL == taosArrayPush(*dst, &info)) { qError("taosArrayPush failed, idx:%d", i); free(info.msg); @@ -1548,7 +1551,7 @@ _return: schedulerFreeTaskList(*dst); *dst = NULL; - + SCH_RET(code); } @@ -1668,7 +1671,7 @@ void scheduleFreeJob(void *job) { schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); setJobFree = true; } - + usleep(1); } else { assert(0); diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 905b08ee0b..eee8f8ed33 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -44,7 +44,7 @@ typedef struct { // TDB Operations TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type); -TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags); +TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags); TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags); #ifdef __cplusplus diff --git a/source/libs/tdb/src/db/tdbDB.c b/source/libs/tdb/src/db/tdbDB.c index 2af40d8642..eaf85ea4a1 100644 --- a/source/libs/tdb/src/db/tdbDB.c +++ b/source/libs/tdb/src/db/tdbDB.c @@ -56,9 +56,28 @@ _err: return 0; } -TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) { - // TODO - return 0; +TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags) { + int ret = 0; + + if ((dbp->fname = strdup(fname)) == NULL) { + ret = -1; + return ret; + } + + // Create the backup file if the file not exists + + // Open the file as a sub-db or a master-db + if (dbname) { + if ((dbp->dbname = strdup(dbname)) == NULL) { + ret = -1; + return ret; + } + // TODO: Open the DB as a SUB-DB in this file + } else { + // TODO: Open the DB as a MASTER-DB in this file + } + + return ret; } TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) { diff --git a/source/libs/tdb/src/inc/tdbDB.h b/source/libs/tdb/src/inc/tdbDB.h index fca197dc39..d0ef9e22d0 100644 --- a/source/libs/tdb/src/inc/tdbDB.h +++ b/source/libs/tdb/src/inc/tdbDB.h @@ -25,6 +25,14 @@ extern "C" { #endif +typedef struct { + // TODO +} TDB_MPOOL; + +typedef struct { + int fd; +} TDB_FH; + struct TDB { pgsize_t pageSize; tdb_db_t type; @@ -35,6 +43,9 @@ struct TDB { TDB_HASH * hash; TDB_HEAP * heap; } dbam; // db access method + + TDB_FH * fhp; // The backup file handle + TDB_MPOOL *mph; // The memory pool handle }; #ifdef __cplusplus diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 38c3b0b917..f27e17f1ca 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) { int ret; TDB *dbp; - tdbCreateDB(&dbp, TDB_BTREE_T); + // tdbCreateDB(&dbp, TDB_BTREE_T); - tdbOpenDB(dbp, 0); + // tdbOpenDB(dbp, 0); - tdbCloseDB(dbp, 0); + // tdbCloseDB(dbp, 0); } \ No newline at end of file diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 726247d450..469da11d93 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tconfig.h" -#include "ulog.h" #include "tutil.h" +#include "ulog.h" SGlobalCfg tsGlobalConfig[TSDB_CFG_MAX_NUM] = {{0}}; int32_t tsGlobalConfigNum = 0;