diff --git a/CMakeLists.txt b/CMakeLists.txt index cda71fb3bf..99e48006b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib") include(${CMAKE_SUPPORT_DIR}/cmake.options) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3") +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3") # contrib add_subdirectory(contrib) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 60c688d95b..4d33706303 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. @@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 975b103538..fa1483de87 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param qnodeList Qnode address list, element is SEpAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows); + +/** + * Process the query job, generated according to the query physical plan. + * This is a asynchronized API, and is also thread-safety. + * @param qnodeList Qnode address list, element is SEpAddr + * @return + */ +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); @@ -79,4 +87,4 @@ void schedulerDestroy(void); } #endif -#endif /*_TD_SCHEDULER_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_H_*/ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 00fe769299..c7ea3c2e09 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -209,7 +209,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { - return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 7ca2822332..b882e1a42c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) { +int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; + *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + if (NULL == *vgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); while (pIter) { vgInfo = pIter; - if (NULL == taosArrayPush(vgroupList, vgInfo)) { + if (NULL == taosArrayPush(*vgroupList, vgInfo)) { ctgError("taosArrayPush failed"); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -295,14 +301,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == pCatalog->tableCache.cache) { - pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->tableCache.cache) { - ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - } - if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { @@ -329,7 +327,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out } } - if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) { + int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); goto error_exit; } @@ -529,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -549,17 +548,29 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { + *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == *pVgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } } + tfree(tbMeta); + + return TSDB_CODE_SUCCESS; + _return: tfree(tbMeta); + + taosArrayDestroy(*pVgroupList); CTG_RET(code); } @@ -634,8 +645,8 @@ _return: CTG_RET(code); } -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) { +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index aa5a8e2b3e..62279b9e1f 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -32,27 +32,28 @@ #include "stub.h" #include "addr_any.h" -typedef struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; -} SAppInstInfo; -typedef struct STscObj { - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char acctId[TSDB_ACCT_ID_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - uint32_t connId; - uint64_t id; // ref ID returned by taosAddRef -// struct SSqlObj *sqlList; - void *pTransporter; - pthread_mutex_t mutex; // used to protect the operation on db - int32_t numOfReqs; // number of sqlObj from this tscObj - SAppInstInfo *pAppInfo; -} STscObj; namespace { +void ctgTestSetPrepareTableMeta(); +void ctgTestSetPrepareCTableMeta(); +void ctgTestSetPrepareSTableMeta(); + + +int32_t ctgTestVgNum = 10; +int32_t ctgTestColNum = 2; +int32_t ctgTestTagNum = 1; +int32_t ctgTestSVersion = 1; +int32_t ctgTestTVersion = 1; + +char *ctgTestClusterId = "cluster1"; +char *ctgTestDbname = "1.db1"; +char *ctgTestTablename = "table1"; +char *ctgTestCTablename = "ctable1"; +char *ctgTestSTablename = "stable1"; + + void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.db1"); @@ -88,22 +89,281 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { ASSERT_EQ(rpcRsp.code, 0); } -void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; //todo + pRsp->code =0; + pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (SUseDbRsp *)pRsp->pCont; + strcpy(rspMsg->db, ctgTestDbname); + rspMsg->vgVersion = htonl(1); + rspMsg->vgNum = htonl(ctgTestVgNum); + rspMsg->hashMethod = 0; + + SVgroupInfo *vg = NULL; + uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; + for (int32_t i = 0; i < ctgTestVgNum; ++i) { + vg = &rspMsg->vgroupInfo[i]; + + vg->vgId = htonl(i + 1); + vg->hashBegin = htonl(i * hashUnit); + vg->hashEnd = htonl(hashUnit * (i + 1) - 1); + vg->numOfEps = i % TSDB_MAX_REPLICA + 1; + vg->inUse = i % vg->numOfEps; + for (int32_t n = 0; n < vg->numOfEps; ++n) { + SEpAddrMsg *addr = &vg->epAddr[n]; + strcpy(addr->fqdn, "a0"); + addr->port = htons(n + 22); + } + } + + vg->hashEnd = htonl(UINT32_MAX); + return; } -void initTestEnv() { + + +void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename); + rspMsg->numOfTags = 0; + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_NORMAL_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = 0; + rspMsg->tuid = htobe64(0x0000000000000001); + rspMsg->vgId = htonl(8); + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1"); + + return; +} + + +void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename); + sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + rspMsg->numOfTags = htonl(ctgTestTagNum); + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_CHILD_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = htobe64(0x0000000000000002); + rspMsg->tuid = htobe64(0x0000000000000003); + rspMsg->vgId = htonl(9); + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = htonl(2); + s->bytes = htonl(12); + strcpy(s->name, "tag1s"); + + + return; +} + + +void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + rspMsg->numOfTags = htonl(ctgTestTagNum); + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_SUPER_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = htobe64(0x0000000000000002); + rspMsg->tuid = htobe64(0x0000000000000003); + rspMsg->vgId = 0; + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = htonl(2); + s->bytes = htonl(12); + strcpy(s->name, "tag1s"); + + + return; +} + +void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareTableMeta(); + + return; +} + + +void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareCTableMeta(); + + return; +} + +void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareSTableMeta(); + + return; +} + + + +void ctgTestSetPrepareDbVgroups() { static Stub stub; - stub.set(rpcSendRecv, __rpcSendRecv); + stub.set(rpcSendRecv, ctgTestPrepareDbVgroups); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto& f : result) { - stub.set(f.second, __rpcSendRecv); + stub.set(f.second, ctgTestPrepareDbVgroups); + } + } +} + +void ctgTestSetPrepareTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareTableMeta); + } + } +} + +void ctgTestSetPrepareCTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareCTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareCTableMeta); + } + } +} + +void ctgTestSetPrepareSTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareSTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareSTableMeta); + } + } +} + +void ctgTestSetPrepareDbVgroupsAndNormalMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndNormalMeta); + } + } +} + + +void ctgTestSetPrepareDbVgroupsAndChildMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndChildMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndChildMeta); + } + } +} + +void ctgTestSetPrepareDbVgroupsAndSuperMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndSuperMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndSuperMeta); } } } @@ -111,33 +371,267 @@ void initTestEnv() { } -TEST(testCase, normalCase) { - STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - char *clusterId = "cluster1"; - char *dbname = "1.db1"; - char *tablename = "table1"; +TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + ctgTestSetPrepareDbVgroups(); + initQueryModuleMsgHandle(); - sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); - code = catalogGetHandle(clusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo); + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgInfo); ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.numOfEps, 3); - taos_close(pConn); + ctgTestSetPrepareTableMeta(); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + catalogDestroy(); } +TEST(tableMeta, childTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestSetPrepareDbVgroupsAndChildMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + catalogDestroy(); +} + +TEST(tableMeta, superTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + ctgTestSetPrepareCTableMeta(); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + + + catalogDestroy(); +} + +TEST(tableDistVgroup, normalTable) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndNormalMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 8); + ASSERT_EQ(vgInfo->numOfEps, 3); + + catalogDestroy(); +} + +TEST(tableDistVgroup, childTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndChildMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 9); + ASSERT_EQ(vgInfo->numOfEps, 4); + + + catalogDestroy(); +} + +TEST(tableDistVgroup, superTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 1); + ASSERT_EQ(vgInfo->numOfEps, 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); + ASSERT_EQ(vgInfo->vgId, 2); + ASSERT_EQ(vgInfo->numOfEps, 2); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); + ASSERT_EQ(vgInfo->vgId, 3); + ASSERT_EQ(vgInfo->numOfEps, 3); + + + catalogDestroy(); +} + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 9507b93222..34db262d5d 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -92,8 +92,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - pRsp->vgVersion = htonl(pRsp->vgVersion); - pRsp->vgNum = htonl(pRsp->vgNum); + pRsp->vgVersion = ntohl(pRsp->vgVersion); + pRsp->vgNum = ntohl(pRsp->vgNum); if (pRsp->vgNum < 0) { qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); @@ -115,12 +115,12 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { } for (int32_t i = 0; i < pRsp->vgNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin); - pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); + pRsp->vgroupInfo[i].vgId = ntohl(pRsp->vgroupInfo[i].vgId); + pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin); + pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd); for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port); + pRsp->vgroupInfo[i].epAddr[n].port = ntohs(pRsp->vgroupInfo[i].epAddr[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { @@ -142,13 +142,13 @@ _return: } static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { - pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags); - pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); - pMetaMsg->sversion = htonl(pMetaMsg->sversion); - pMetaMsg->tversion = htonl(pMetaMsg->tversion); + pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags); + pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); + pMetaMsg->sversion = ntohl(pMetaMsg->sversion); + pMetaMsg->tversion = ntohl(pMetaMsg->tversion); pMetaMsg->tuid = htobe64(pMetaMsg->tuid); pMetaMsg->suid = htobe64(pMetaMsg->suid); - pMetaMsg->vgId = htonl(pMetaMsg->vgId); + pMetaMsg->vgId = ntohl(pMetaMsg->vgId); if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags); @@ -179,8 +179,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags; for (int i = 0; i < numOfTotalCols; ++i) { - pSchema->bytes = htonl(pSchema->bytes); - pSchema->colId = htonl(pSchema->colId); + pSchema->bytes = ntohl(pSchema->bytes); + pSchema->colId = ntohl(pSchema->colId); pSchema++; } @@ -202,7 +202,8 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl qError("calloc size[%d] failed", metaSize); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - + + pTableMeta->vgId = isSuperTable ? 0 : msg->vgId; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; pTableMeta->uid = msg->suid; pTableMeta->suid = msg->suid; @@ -213,12 +214,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl pTableMeta->tableInfo.precision = msg->precision; pTableMeta->tableInfo.numOfColumns = msg->numOfColumns; + memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); + for(int32_t i = 0; i < msg->numOfColumns; ++i) { pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; } - memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); - *pMeta = pTableMeta; return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index 4eafa50bdc..001756d7c3 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( qworker PRIVATE os util transport planner qcom ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 994b46c5c4..149f46273c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qError("invalid query msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + + msg->schedulerId = htobe64(msg->schedulerId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + msg->contentLen = ntohl(msg->contentLen); bool queryDone = false; bool queryRsp = false; diff --git a/source/libs/qworker/test/CMakeLists.txt b/source/libs/qworker/test/CMakeLists.txt new file mode 100644 index 0000000000..6d755ad487 --- /dev/null +++ b/source/libs/qworker/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build qworker unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + qworkerTest + PUBLIC os util common transport gtest qcom planner qworker +) + +TARGET_INCLUDE_DIRECTORIES( + qworkerTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/qworker/inc" +) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp new file mode 100644 index 0000000000..4b54b77544 --- /dev/null +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "tep.h" +#include "trpc.h" +#include "planner.h" +#include "qworker.h" +#include "stub.h" +#include "addr_any.h" + + +namespace { + +int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { + return 0; +} + + +void stubSetStringToPlan() { + static Stub stub; + stub.set(qStringToSubplan, qwtStringToPlan); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qStringToSubplan$", result); + for (const auto& f : result) { + stub.set(f.second, qwtStringToPlan); + } + } +} + + +} + + +TEST(testCase, normalCase) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->schedulerId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + + SResReadyMsg readyMsg = {0}; + readyMsg.schedulerId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + + SResFetchMsg fetchMsg = {0}; + fetchMsg.schedulerId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + + STaskDropMsg dropMsg = {0}; + dropMsg.schedulerId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + + stubSetStringToPlan(); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + ASSERT_EQ(code, 0); + +} + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index bc7bc44350..2381a1dd49 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; -typedef struct SQueryLevel { +typedef struct SSchLevel { int32_t level; int8_t status; SRWLatch lock; @@ -51,12 +51,12 @@ typedef struct SQueryLevel { int32_t taskSucceed; int32_t taskNum; SArray *subTasks; // Element is SQueryTask -} SQueryLevel; +} SSchLevel; -typedef struct SQueryTask { +typedef struct SSchTask { uint64_t taskId; // task id - SQueryLevel *level; // level + SSchLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree int32_t msgLen; // msg length @@ -66,13 +66,20 @@ typedef struct SQueryTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* -} SQueryTask; +} SSchTask; -typedef struct SQueryJob { +typedef struct SSchJobAttr { + bool needFetch; + bool syncSchedule; + bool queryJob; +} SSchJobAttr; + +typedef struct SSchJob { uint64_t queryId; int32_t levelNum; int32_t levelIdx; int8_t status; + SSchJobAttr attr; SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; @@ -81,15 +88,19 @@ typedef struct SQueryJob { tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; - void *res; + SSchTask *fetchTask; + int32_t errCode; + void *res; + int32_t resNumOfRows; + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SArray *levels; // Element is SQueryLevel, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. -} SQueryJob; +} SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE @@ -108,7 +119,7 @@ typedef struct SQueryJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); +extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 876a4ff4ae..503383f4b1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } -int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { +int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { - SQueryLevel *level = taosArrayGet(job->levels, i); + SSchLevel *level = taosArrayGet(job->levels, i); for (int32_t m = 0; m < level->taskNum; ++m) { - SQueryTask *task = taosArrayGet(level->subTasks, m); + SSchTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; @@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan *child = taosArrayGet(plan->pChildern, n); - SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); - if (childTask) { + SSubplan **child = taosArrayGet(plan->pChildern, n); + SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); + if (NULL == childTask || NULL == *childTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->children, &childTask)) { + if (NULL == taosArrayPush(task->children, childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan *parent = taosArrayGet(plan->pParents, n); - SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); - if (parentTask) { + SSubplan **parent = taosArrayGet(plan->pParents, n); + SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); + if (NULL == parentTask || NULL == *parentTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->parents, &parentTask)) { + if (NULL == taosArrayPush(task->parents, parentTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } } - SQueryLevel *level = taosArrayGet(job->levels, 0); - if (level->taskNum > 1) { + SSchLevel *level = taosArrayGet(job->levels, 0); + if (job->attr.queryJob && level->taskNum > 1) { qError("invalid plan info, level 0, taskNum:%d", level->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SQueryTask *task = taosArrayGet(level->subTasks, 0); + SSchTask *task = taosArrayGet(level->subTasks, 0); if (task->parents && taosArrayGetSize(task->parents) > 0) { qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } -int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { int32_t code = 0; job->queryId = dag->queryId; @@ -146,21 +146,23 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.needFetch = true; + job->levelNum = levelNum; job->levelIdx = levelNum - 1; job->subPlans = dag->pSubplans; - SQueryLevel level = {0}; + SSchLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; - SQueryLevel *pLevel = NULL; + SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -187,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { pLevel->taskNum = levelPlanNum; - pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -195,7 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); - SQueryTask task = {0}; + SSchTask task = {0}; + + if (plan->type == QUERY_TYPE_MODIFY) { + job->attr.needFetch = false; + } else { + job->attr.queryJob = true; + } + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; @@ -236,7 +245,7 @@ _return: SCH_RET(code); } -int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { +int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } @@ -263,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { } -int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { +int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -272,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); return TSDB_CODE_SUCCESS; @@ -288,10 +297,9 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); - return TSDB_CODE_SUCCESS; + qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { @@ -305,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { } -int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { +int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { int32_t msgSize = 0; void *msg = NULL; @@ -357,6 +365,9 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { break; } case TDMT_VND_FETCH: { + if (NULL == task) { + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } msgSize = sizeof(SResFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { @@ -395,7 +406,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { return TSDB_CODE_SUCCESS; } -int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -405,7 +416,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod } -int32_t schFetchFromRemote(SQueryJob *job) { +int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -413,7 +424,7 @@ int32_t schFetchFromRemote(SQueryJob *job) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TDMT_VND_FETCH)); + SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; @@ -424,8 +435,12 @@ _return: } -int32_t schProcessOnJobSuccess(SQueryJob *job) { - job->status = JOB_TASK_STATUS_SUCCEED; +int32_t schProcessOnJobPartialSuccess(SSchJob *job) { + job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + + if ((!job->attr.needFetch) && job->attr.syncSchedule) { + tsem_post(&job->rspSem); + } if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); @@ -434,26 +449,27 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SQueryJob *job) { +int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; + job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - if (job->userFetch) { + if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnDataFetched(SQueryJob *job) { +int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { +int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); @@ -464,7 +480,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { task->status = JOB_TASK_STATUS_SUCCEED; - int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); + int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0; if (parentNum == 0) { if (task->plan->level != 0) { qError("level error"); @@ -475,7 +491,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (SCH_TASK_NEED_WAIT_ALL(task)) { SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskFailed++; + task->level->taskSucceed++; taskDone = task->level->taskSucceed + task->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &task->level->lock); @@ -486,7 +502,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (task->level->taskFailed > 0) { job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR)); return TSDB_CODE_SUCCESS; } @@ -495,7 +511,9 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { job->resEp.port = task->execAddr.port; } - SCH_ERR_RET(schProcessOnJobSuccess(job)); + job->fetchTask = task; + + SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); return TSDB_CODE_SUCCESS; } @@ -508,21 +526,21 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { } for (int32_t i = 0; i < parentNum; ++i) { - SQueryTask *par = taosArrayGet(task->parents, i); + SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); ++par->childReady; SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schLaunchTask(job, task)); + SCH_ERR_RET(schLaunchTask(job, par)); } } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -534,7 +552,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); - return TSDB_CODE_SUCCESS; } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -550,7 +567,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod } job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, errCode)); return TSDB_CODE_SUCCESS; } @@ -560,34 +577,60 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { +int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { - case TDMT_VND_QUERY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); - if (code) { - goto _task_error; + case TDMT_VND_SUBMIT: { + SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + job->resNumOfRows += rsp->affectedRows; + + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } } + break; } - break; - case TDMT_VND_RES_READY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } + case TDMT_VND_QUERY: { + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); + if (code) { + goto _task_error; + } + } + break; + } + case TDMT_VND_RES_READY: { + SResReadyRsp *rsp = (SResReadyRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + } + case TDMT_VND_FETCH: { + SCH_ERR_JRET(rspCode); + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + + job->res = rsp; + job->resNumOfRows = rsp->numOfRows; + + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; } - break; - case TDMT_VND_FETCH: - SCH_ERR_JRET(rspCode); - SCH_ERR_JRET(schProcessOnDataFetched(job)); - break; default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; @@ -600,14 +643,14 @@ _task_error: return TSDB_CODE_SUCCESS; _return: - code = schProcessOnJobFailure(job); + code = schProcessOnJobFailure(job, code); return code; } -int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { +int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); if (plan->execEpSet.numOfEps <= 0) { @@ -630,10 +673,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchJob(SQueryJob *job) { - SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); +int32_t schLaunchJob(SSchJob *job) { + SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { - SQueryTask *task = taosArrayGet(level->subTasks, i); + SSchTask *task = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(job, task)); } @@ -642,10 +685,10 @@ int32_t schLaunchJob(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SQueryJob *job) { +void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); @@ -654,7 +697,7 @@ void schDropJobAllTasks(SQueryJob *job) { pIter = taosHashIterate(job->failTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); @@ -680,7 +723,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { +int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -690,11 +733,12 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } int32_t code = 0; - SQueryJob *job = calloc(1, sizeof(SQueryJob)); + SSchJob *job = calloc(1, sizeof(SSchJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.syncSchedule = syncSchedule; job->transport = transport; job->qnodeList = qnodeList; @@ -719,54 +763,104 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } tsem_init(&job->rspSem, 0, 0); - - if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { - qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + + code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } else { + qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } job->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(job)); - *(SQueryJob **)pJob = job; + *(SSchJob **)pJob = job; + + if (syncSchedule) { + tsem_wait(&job->rspSem); + } return TSDB_CODE_SUCCESS; _return: - *(SQueryJob **)pJob = NULL; + *(SSchJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); } +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) { + *numOfRows = 0; + + SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); + + SSchJob *job = *(SSchJob **)pJob; + + *numOfRows = job->resNumOfRows; + + return TSDB_CODE_SUCCESS; +} + +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false); +} + + int32_t scheduleFetchRows(void *pJob, void **data) { if (NULL == pJob || NULL == data) { - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQueryJob *job = pJob; + SSchJob *job = pJob; int32_t code = 0; - if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { - qError("prior fetching not finished"); - return TSDB_CODE_QRY_APP_ERROR; + if (!job->attr.needFetch) { + qError("no need to fetch data"); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (job->status == JOB_TASK_STATUS_FAILED) { + job->res = NULL; + SCH_RET(job->errCode); } if (job->status == JOB_TASK_STATUS_SUCCEED) { + job->res = NULL; + return TSDB_CODE_SUCCESS; + } + + if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { + qError("prior fetching not finished"); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(job)); } tsem_wait(&job->rspSem); + if (job->status == JOB_TASK_STATUS_FAILED) { + code = job->errCode; + } + + if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { + job->status = JOB_TASK_STATUS_SUCCEED; + } + *data = job->res; job->res = NULL; _return: atomic_val_compare_exchange_32(&job->userFetch, 1, 0); - return code; + SCH_RET(code); } int32_t scheduleCancelJob(void *pJob) { @@ -782,7 +876,7 @@ void scheduleFreeJob(void *pJob) { return; } - SQueryJob *job = pJob; + SSchJob *job = pJob; if (job->status > 0) { if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 9e94553058..4732429d0b 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -26,72 +26,310 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "catalog.h" -#include "scheduler.h" +#include "catalog.h" +#include "scheduler.h" #include "tep.h" #include "trpc.h" - +#include "schedulerInt.h" +#include "stub.h" +#include "addr_any.h" + namespace { -void mockBuildDag(SQueryDag *dag) { - uint64_t qId = 0x111111111111; - - dag->queryId = qId; - dag->numOfSubplans = 2; - dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SArray *scan = taosArrayInit(1, sizeof(SSubplan)); - SArray *merge = taosArrayInit(1, sizeof(SSubplan)); - - SSubplan scanPlan = {0}; - SSubplan mergePlan = {0}; - - scanPlan.id.queryId = qId; - scanPlan.id.templateId = 0x2222222222; - scanPlan.id.subplanId = 0x3333333333; - scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.level = 1; - scanPlan.execEpSet.numOfEps = 1; - scanPlan.pChildern = NULL; - scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); - - mergePlan.id.queryId = qId; - mergePlan.id.templateId = 0x4444444444; - mergePlan.id.subplanId = 0x5555555555; - mergePlan.type = QUERY_TYPE_MERGE; - mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 1; - mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); - mergePlan.pParents = NULL; - - SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); - SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); - - taosArrayPush(mergePointer->pChildern, &scanPointer); - taosArrayPush(scanPointer->pParents, &mergePointer); - - taosArrayPush(dag->pSubplans, &merge); - taosArrayPush(dag->pSubplans, &scan); -} - + +extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); + +void schtBuildQueryDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000001; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + SArray *scan = taosArrayInit(1, sizeof(SSubplan)); + SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + + SSubplan scanPlan = {0}; + SSubplan mergePlan = {0}; + + scanPlan.id.queryId = qId; + scanPlan.id.templateId = 0x0000000000000002; + scanPlan.id.subplanId = 0x0000000000000003; + scanPlan.type = QUERY_TYPE_SCAN; + scanPlan.level = 1; + scanPlan.execEpSet.numOfEps = 1; + scanPlan.execEpSet.port[0] = 6030; + strcpy(scanPlan.execEpSet.fqdn[0], "ep0"); + scanPlan.pChildern = NULL; + scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + + mergePlan.id.queryId = qId; + mergePlan.id.templateId = 0x4444444444; + mergePlan.id.subplanId = 0x5555555555; + mergePlan.type = QUERY_TYPE_MERGE; + mergePlan.level = 0; + mergePlan.execEpSet.numOfEps = 0; + mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); + mergePlan.pParents = NULL; + mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + + SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); + SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + + taosArrayPush(mergePointer->pChildern, &scanPointer); + taosArrayPush(scanPointer->pParents, &mergePointer); + + taosArrayPush(dag->pSubplans, &merge); + taosArrayPush(dag->pSubplans, &scan); } -TEST(testCase, normalCase) { - void *mockPointer = (void *)0x1; +void schtBuildInsertDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000002; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(1, POINTER_BYTES); + SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); + + SSubplan insertPlan[2] = {0}; + + insertPlan[0].id.queryId = qId; + insertPlan[0].id.templateId = 0x0000000000000003; + insertPlan[0].id.subplanId = 0x0000000000000004; + insertPlan[0].type = QUERY_TYPE_MODIFY; + insertPlan[0].level = 0; + insertPlan[0].execEpSet.numOfEps = 1; + insertPlan[0].execEpSet.port[0] = 6030; + strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); + insertPlan[0].pChildern = NULL; + insertPlan[0].pParents = NULL; + insertPlan[0].pNode = NULL; + insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + insertPlan[1].id.queryId = qId; + insertPlan[1].id.templateId = 0x0000000000000003; + insertPlan[1].id.subplanId = 0x0000000000000005; + insertPlan[1].type = QUERY_TYPE_MODIFY; + insertPlan[1].level = 0; + insertPlan[1].execEpSet.numOfEps = 1; + insertPlan[1].execEpSet.port[0] = 6030; + strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); + insertPlan[1].pChildern = NULL; + insertPlan[1].pParents = NULL; + insertPlan[1].pNode = NULL; + insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + + taosArrayPush(inserta, &insertPlan[0]); + taosArrayPush(inserta, &insertPlan[1]); + + taosArrayPush(dag->pSubplans, &inserta); +} + + +int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { + *str = (char *)calloc(1, 20); + *len = 20; + return 0; +} + +int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { + return 0; +} + + +void schtSetPlanToString() { + static Stub stub; + stub.set(qSubPlanToString, schtPlanToString); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSubPlanToString$", result); + for (const auto& f : result) { + stub.set(f.second, schtPlanToString); + } + } +} + +void schtSetExecNode() { + static Stub stub; + stub.set(qSetSubplanExecutionNode, schtExecNode); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result); + for (const auto& f : result) { + stub.set(f.second, schtExecNode); + } + } +} + +void *schtSendRsp(void *param) { + SSchJob *job = NULL; + int32_t code = 0; + + while (true) { + job = *(SSchJob **)param; + if (job) { + break; + } + + usleep(1000); + } + + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SShellSubmitRspMsg rsp = {0}; + rsp.affectedRows = 10; + schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + return NULL; +} + +void *pInsertJob = NULL; + + +} + +TEST(queryTest, normalCase) { + void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; char *dbname = "1.db1"; char *tablename = "table1"; - SVgroupInfo vgInfo = {0}; - void *pJob = NULL; - SQueryDag dag = {0}; - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); - - int32_t code = schedulerInit(NULL); + SVgroupInfo vgInfo = {0}; + void *pJob = NULL; + SQueryDag dag = {0}; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - - mockBuildDag(&dag); - - code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob); + + schtBuildQueryDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); -} + + SSchJob *job = (SSchJob *)pJob; + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + SRetrieveTableRsp rsp = {0}; + rsp.completed = 1; + rsp.numOfRows = 10; + code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + + + void *data = NULL; + + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + + data = NULL; + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_EQ(data, (void*)NULL); + + scheduleFreeJob(pJob); +} + + + + +TEST(insertTest, normalCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + uint64_t numOfRows = 0; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildInsertDag(&dag); + + schtSetPlanToString(); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); + + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows); + ASSERT_EQ(code, 0); + ASSERT_EQ(numOfRows, 20); + + scheduleFreeJob(pInsertJob); +} + + int main(int argc, char** argv) { @@ -101,4 +339,4 @@ int main(int argc, char** argv) { - +