From c0eb9f6e8758ed88165379b944bed0dd3841f9fd Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 16:23:25 +0800 Subject: [PATCH 1/4] add ut case --- include/libs/catalog/catalog.h | 9 +- include/libs/query/query.h | 2 + source/libs/catalog/src/catalog.c | 48 ++++--- source/libs/catalog/test/CMakeLists.txt | 2 +- source/libs/catalog/test/catalogTests.cpp | 159 +++++++--------------- 5 files changed, 84 insertions(+), 136 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 1f2452291b..449064c8c6 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -77,7 +77,14 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @pVgroupList - array of SVgroupInfo * @return */ -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); + +/** + * get a table's dst vgroup from its name's hash value. + * @vgInfo - SVgroupInfo + * @return + */ +int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo); /** diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 8720fd085c..060aef9d65 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -85,6 +85,8 @@ typedef struct STableMetaOutput { extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); +extern void msgInit(); + #ifdef __cplusplus } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 9fdac36060..3aac02c1e4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -63,6 +63,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp SRpcMsg rpcRsp = {0}; rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for use db, code:%x", rpcRsp.code); + return rpcRsp.code; + } code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); if (code) { @@ -169,9 +173,9 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE ctgGenEpSet(&epSet, vgroupInfo); rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); - + if (TSDB_CODE_SUCCESS != rpcRsp.code) { - ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code); + ctgError("error rsp for table meta, code:%x", rpcRsp.code); return rpcRsp.code; } @@ -254,24 +258,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co } -int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo dbInfo = {0}; - int32_t code = 0; - int32_t vgId = 0; - - CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); - - if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); - return TSDB_CODE_TSC_DB_NOT_SELECTED; - } - - CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); - - return code; -} - - STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { assert(pChild != NULL); @@ -554,7 +540,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe SVgroupInfo vgroupInfo = {0}; - CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); STableMetaOutput output = {0}; @@ -571,7 +557,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -607,6 +593,24 @@ _return: } +int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo dbInfo = {0}; + int32_t code = 0; + int32_t vgId = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); + + return code; +} + + int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { return TSDB_CODE_CTG_INVALID_INPUT; diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt index 527156f176..176978cc7f 100644 --- a/source/libs/catalog/test/CMakeLists.txt +++ b/source/libs/catalog/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(catalogTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( catalogTest - PUBLIC os util common catalog transport gtest query + PUBLIC os util common catalog transport gtest query taos ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index f495451091..0493ddfe8c 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include #include #include #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -23,130 +23,65 @@ #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" -#include "taos.h" +#include "taos.h" #include "tdef.h" -#include "tvariant.h" -#include "catalog.h" - +#include "tvariant.h" +#include "catalog.h" +#include "tep.h" + +typedef struct SAppInstInfo { + int64_t numOfConns; + SCorEpSet mgmtEp; +} SAppInstInfo; + +typedef struct STscObj { + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + char acctId[TSDB_ACCT_ID_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + uint32_t connId; + uint64_t id; // ref ID returned by taosAddRef +// struct SSqlObj *sqlList; + void *pTransporter; + pthread_mutex_t mutex; // used to protect the operation on db + int32_t numOfReqs; // number of sqlObj from this tscObj + SAppInstInfo *pAppInfo; +} STscObj; + namespace { - - + } - -TEST(testCase, normalCase) { - char *clusterId = "cluster1"; - struct SCatalog* pCtg = NULL; - - int32_t code = catalogInit(NULL); - ASSERT_EQ(code, 0); - - code = catalogGetHandle(clusterId, &pCtg); - ASSERT_EQ(code, 0); - - -} - -/* -TEST(testCase, normalCase) { - SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); - ASSERT_EQ(info1.valid, true); - char msg[128] = {0}; - SMsgBuf buf; - buf.len = 128; - buf.buf = msg; +TEST(testCase, normalCase) { + STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + assert(pConn != NULL); - SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0); - int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); + char *clusterId = "cluster1"; + char *dbname = "db1"; + char *tablename = "table1"; + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + msgInit(); + + int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); - SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); - ASSERT_EQ(ret, 0); - ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); + code = catalogGetHandle(clusterId, &pCtg); + ASSERT_EQ(code, 0); - SQueryStmtInfo* pQueryInfo = createQueryInfo(); - setTableMetaInfo(pQueryInfo, &req); + code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo); + ASSERT_EQ(code, 0); - SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0); - ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); - ASSERT_EQ(ret, 0); - - SArray* pExprList = pQueryInfo->exprList[0]; - - int32_t num = tsCompatibleModel? 2:1; - ASSERT_EQ(taosArrayGetSize(pExprList), num); - - SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); - ASSERT_EQ(p1->base.pColumns->uid, 110); - ASSERT_EQ(p1->base.numOfParams, 1); - ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); - ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); - ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP); - ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); - ASSERT_EQ(p1->base.interBytes, 16); - - ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); - ASSERT_STREQ(p1->pExpr->_function.functionName, "top"); - - tExprNode* pParam = p1->pExpr->_function.pChild[0]; - - ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE); - ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); - ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); - - struct SQueryPlanNode* n = nullptr; - code = createQueryPlan(pQueryInfo, &n); - - char* str = NULL; - queryPlanToString(n, &str); - printf("%s\n", str); - - destroyQueryInfo(pQueryInfo); - qParserClearupMetaRequestInfo(&req); - destroySqlInfo(&info1); + taos_close(pConn); } -TEST(testCase, displayPlan) { - generateLogicplan("select count(*) from `t.1abc`"); - generateLogicplan("select count(*)+ 22 from `t.1abc`"); - generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30"); - generateLogicplan("select count(*) from `t.1abc` group by a"); - generateLogicplan("select count(A+B) from `t.1abc` group by a"); - generateLogicplan("select count(length(a)+b) from `t.1abc` group by a"); - generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)"); - generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc "); - generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`"); - generateLogicplan("select count(*), min(a) + 99 from `t.1abc`"); - generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`"); - generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20"); - generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)"); - generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)"); - // order by + group by column + limit offset - generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1"); - - // fill - generateLogicplan("select min(a) from `t.1abc` where ts>now and ts Date: Thu, 16 Dec 2021 17:23:17 +0800 Subject: [PATCH 2/4] modify scheduler api --- include/libs/scheduler/scheduler.h | 30 ++++------------- source/libs/scheduler/inc/schedulerInt.h | 16 ++++++++- source/libs/scheduler/src/scheduler.c | 43 +++++++++++++++++++++++- 3 files changed, 63 insertions(+), 26 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 6b3c9ed021..fe22d33086 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "planner.h" + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -43,43 +45,23 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status - SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage -} SQueryTask; - -typedef struct SQueryJob { - SArray **pSubtasks; - // todo -} SQueryJob; - /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. * @param pJob * @return */ -int32_t qProcessQueryJob(struct SQueryJob* pJob); +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); -/** - * The SSqlObj should not be here???? - * @param pSql - * @param pVgroupId - * @param pRetVgroupId - * @return - */ -//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId); /** * Cancel query job * @param pJob * @return */ -int32_t qKillQueryJob(struct SQueryJob* pJob); +int32_t scheduleCancelJob(void *pJob); #ifdef __cplusplus } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index b1b128e200..1648cbfc98 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,8 +31,22 @@ typedef struct SQuery { int32_t currentLevel; } SQuery; +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + char *pSubplan; // operator tree + uint64_t status; // task status + SQueryProfileSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + +typedef struct SQueryJob { + SArray **pSubtasks; + // todo +} SQueryJob; + #ifdef __cplusplus } #endif -#endif /*_TD_SCHEDULER_INT_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_INT_H_*/ diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37f6240f9b..66fd0aa4f3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -13,4 +13,45 @@ * along with this program. If not, see . */ -#include "schedulerInt.h" \ No newline at end of file +#include "schedulerInt.h" +#include "taosmsg.h" + + +int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { +/* + SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); + if (pRequest == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SRequestMsgBody body = {0}; + buildConnectMsg(pRequest, &body); + + int64_t transporterId = 0; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + + tsem_wait(&pRequest->body.rspSem); + destroyConnectMsg(&body); + + if (pRequest->code != TSDB_CODE_SUCCESS) { + const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); + printf("failed to connect to server, reason: %s\n\n", errorMsg); + + destroyRequest(pRequest); + taos_close(pTscObj); + pTscObj = NULL; + } else { + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); + destroyRequest(pRequest); + } +*/ +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); + +int32_t scheduleCancelJob(void *pJob); + + From 5e30641d6cf39d169fae9b98b1bf3e1b6feea577 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 18:50:21 +0800 Subject: [PATCH 3/4] ut test --- source/libs/catalog/src/catalog.c | 13 +++++++- source/libs/catalog/test/catalogTests.cpp | 40 ++++++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3aac02c1e4..248cdbe51c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -54,9 +54,20 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp return code; } + char *pMsg = rpcMallocCont(msgLen); + if (NULL == pMsg) { + ctgError("rpc malloc %d failed", msgLen); + tfree(msg); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(pMsg, msg, msgLen); + + tfree(msg); + SRpcMsg rpcMsg = { .msgType = TSDB_MSG_TYPE_USE_DB, - .pCont = msg, + .pCont = pMsg, .contLen = msgLen, }; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 0493ddfe8c..e14c58d412 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -28,6 +28,7 @@ #include "tvariant.h" #include "catalog.h" #include "tep.h" +#include "trpc.h" typedef struct SAppInstInfo { int64_t numOfConns; @@ -50,6 +51,41 @@ typedef struct STscObj { namespace { +void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { + SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); + strcpy(pReq->db, "1.db1"); + pReq->numOfVgroups = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRowsPerFileBlock = htonl(100); + pReq->maxRowsPerFileBlock = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replications = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->ignoreExist = 1; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDbMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp); + + ASSERT_EQ(rpcRsp.code, 0); +} + } TEST(testCase, normalCase) { @@ -57,13 +93,15 @@ TEST(testCase, normalCase) { assert(pConn != NULL); char *clusterId = "cluster1"; - char *dbname = "db1"; + char *dbname = "1.db1"; char *tablename = "table1"; struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; msgInit(); + + sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); From 2dda97074ab2ce3b81a6ac961987446af713af98 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 09:49:25 +0800 Subject: [PATCH 4/4] fix catalog bug and modify scheduler api --- include/libs/catalog/catalog.h | 81 +++++++++++++++------ include/libs/query/query.h | 12 ++++ include/libs/scheduler/scheduler.h | 4 ++ include/util/taoserror.h | 1 + source/libs/query/inc/queryInt.h | 11 --- source/libs/query/src/querymsg.c | 2 +- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schedulerInt.h | 52 ++++++++++---- source/libs/scheduler/src/scheduler.c | 89 +++++++++++++++++++++++- source/util/src/terror.c | 1 + 10 files changed, 207 insertions(+), 48 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 449064c8c6..ee626865fb 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -32,8 +32,7 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char dbName[TSDB_DB_NAME_LEN]; - SArray *pTableName; // table full name + SArray *pTableName; // element is SNAME SArray *pUdf; // udf name bool qNodeRequired; // valid qnode } SCatalogReq; @@ -54,10 +53,10 @@ typedef struct SCatalogCfg { int32_t catalogInit(SCatalogCfg *cfg); /** - * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. - * There is ONLY one SCatalog object for one process space, and this function returns a singleton. - * @param clusterId - * @return + * Get a cluster's catalog handle for all later operations. + * @param clusterId (input, end with \0) + * @param catalogHandle (output, NO need to free it) + * @return error code */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); @@ -65,36 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); - +/** + * Get a table's meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); + +/** + * Force renew a table's local cached meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @return error code + */ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); + +/** + * Force renew a table's local cached meta data and get the new one. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); /** - * get table's vgroup list. - * @param clusterId - * @pVgroupList - array of SVgroupInfo - * @return + * Get a table's actual vgroup, for stable it's all possible vgroup list. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) + * @return error code */ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); /** - * get a table's dst vgroup from its name's hash value. - * @vgInfo - SVgroupInfo - * @return + * Get a table's vgroup from its name's hash value. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param vgInfo (output, vgroup info) + * @return error code */ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo); /** - * Get the required meta data from mnode. - * Note that this is a synchronized API and is also thread-safety. - * @param pCatalog - * @param pMgmtEps - * @param pMetaReq - * @param pMetaData - * @return + * Get all meta data required in pReq. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pReq (input, reqest info) + * @param pRsp (output, response data) + * @return error code */ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); @@ -105,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S /** * Destroy catalog and relase all resources - * @param pCatalog */ void catalogDestroy(void); diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 060aef9d65..d92f7d4497 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -22,6 +22,7 @@ extern "C" { #include "tarray.h" #include "thash.h" +#include "tlog.h" typedef SVgroupListRspMsg SVgroupListInfo; @@ -88,6 +89,17 @@ extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, extern void msgInit(); +extern int32_t qDebugFlag; + +#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) + + #ifdef __cplusplus } #endif diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index fe22d33086..d73e388c20 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -22,6 +22,10 @@ extern "C" { #include "planner.h" +typedef struct SSchedulerCfg { + +} SSchedulerCfg; + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4f1ef7da7b..689d2676d1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -324,6 +324,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error") +#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input") // grant diff --git a/source/libs/query/inc/queryInt.h b/source/libs/query/inc/queryInt.h index f3204b3785..75c1e134cd 100644 --- a/source/libs/query/inc/queryInt.h +++ b/source/libs/query/inc/queryInt.h @@ -21,17 +21,6 @@ extern "C" { #endif -#include "tlog.h" - -extern int32_t qDebugFlag; - -#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 9d99b568a5..7f033c0fdf 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index fd00085381..6675b7f5ec 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner common + PRIVATE os util planner common query ) \ No newline at end of file diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 1648cbfc98..8e30ce1403 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -24,27 +24,55 @@ extern "C" { #include "tarray.h" #include "planner.h" #include "scheduler.h" +#include "thash.h" -typedef struct SQuery { - SArray **pSubquery; - int32_t numOfLevels; - int32_t currentLevel; -} SQuery; +#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 + +enum { + SCH_STATUS_NOT_START = 1, + SCH_STATUS_EXECUTING, + SCH_STATUS_SUCCEED, + SCH_STATUS_FAILED, + SCH_STATUS_CANCELLING, + SCH_STATUS_CANCELLED +}; + +typedef struct SSchedulerMgmt { + SHashObj *Jobs; // key: queryId, value: SQueryJob* +} SSchedulerMgmt; typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status + uint64_t taskId; // task id + char *pSubplan; // operator tree + int8_t status; // task status SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage } SQueryTask; +typedef struct SQueryLevel { + int8_t status; + int32_t taskNum; + + SArray *subTasks; // Element is SQueryTask + SArray *subPlans; // Element is SSubplan +} SQueryLevel; + typedef struct SQueryJob { - SArray **pSubtasks; - // todo + uint64_t queryId; + int32_t levelNum; + int32_t levelIdx; + int8_t status; + SQueryProfileSummary summary; + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; + +#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 66fd0aa4f3..8d2e1ed916 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -15,6 +15,9 @@ #include "schedulerInt.h" #include "taosmsg.h" +#include "query.h" + +SSchedulerMgmt schMgmt = {0}; int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { @@ -47,11 +50,95 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } +int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { + int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); + if (levelNum <= 0) { + qError("invalid level num:%d", levelNum); + return TSDB_CODE_QRY_INVALID_INPUT; + } -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + if (NULL == job->levels) { + qError("taosArrayInit %d failed", levelNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + job->levelNum = levelNum; + job->levelIdx = levelNum - 1; + job->status = SCH_STATUS_NOT_START; + + job->subPlans = dag->pSubplans; + + SQueryLevel level = {0}; + SArray *levelPlans = NULL; + int32_t levelPlanNum = 0; + + for (int32_t i = 0; i < levelNum; ++i) { + levelPlans = taosArrayGetP(dag->pSubplans, i); + if (NULL == levelPlans) { + qError("no level plans for level %d", i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); + if (levelPlanNum <= 0) { + qError("invalid level plans number:%d, level:%d", levelPlanNum, i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + for (int32_t n = 0; n < levelPlanNum; ++n) { + + } + } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t schedulerInit(SSchedulerCfg *cfg) { + schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.Jobs) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { + if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + + SQueryJob *job = calloc(1, sizeof(SQueryJob)); + if (NULL == job) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + schValidateAndBuildJob(pDag, job); + + + + + + *(SQueryJob **)pJob = job; + + + + +} int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleCancelJob(void *pJob); +void schedulerDestroy(void) { + if (schMgmt.Jobs) { + taosHashCleanup(schMgmt.Jobs); //TBD + schMgmt.Jobs = NULL; + } +} + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 70a3dc622f..5518ec2a31 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") // grant