From c0eb9f6e8758ed88165379b944bed0dd3841f9fd Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 16:23:25 +0800 Subject: [PATCH 1/8] add ut case --- include/libs/catalog/catalog.h | 9 +- include/libs/query/query.h | 2 + source/libs/catalog/src/catalog.c | 48 ++++--- source/libs/catalog/test/CMakeLists.txt | 2 +- source/libs/catalog/test/catalogTests.cpp | 159 +++++++--------------- 5 files changed, 84 insertions(+), 136 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 1f2452291b..449064c8c6 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -77,7 +77,14 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @pVgroupList - array of SVgroupInfo * @return */ -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); + +/** + * get a table's dst vgroup from its name's hash value. + * @vgInfo - SVgroupInfo + * @return + */ +int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo); /** diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 8720fd085c..060aef9d65 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -85,6 +85,8 @@ typedef struct STableMetaOutput { extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); +extern void msgInit(); + #ifdef __cplusplus } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 9fdac36060..3aac02c1e4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -63,6 +63,10 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp SRpcMsg rpcRsp = {0}; rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for use db, code:%x", rpcRsp.code); + return rpcRsp.code; + } code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); if (code) { @@ -169,9 +173,9 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE ctgGenEpSet(&epSet, vgroupInfo); rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); - + if (TSDB_CODE_SUCCESS != rpcRsp.code) { - ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code); + ctgError("error rsp for table meta, code:%x", rpcRsp.code); return rpcRsp.code; } @@ -254,24 +258,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co } -int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo dbInfo = {0}; - int32_t code = 0; - int32_t vgId = 0; - - CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); - - if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); - return TSDB_CODE_TSC_DB_NOT_SELECTED; - } - - CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); - - return code; -} - - STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { assert(pChild != NULL); @@ -554,7 +540,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe SVgroupInfo vgroupInfo = {0}; - CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); STableMetaOutput output = {0}; @@ -571,7 +557,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -607,6 +593,24 @@ _return: } +int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo dbInfo = {0}; + int32_t code = 0; + int32_t vgId = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup)); + + return code; +} + + int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { return TSDB_CODE_CTG_INVALID_INPUT; diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt index 527156f176..176978cc7f 100644 --- a/source/libs/catalog/test/CMakeLists.txt +++ b/source/libs/catalog/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(catalogTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( catalogTest - PUBLIC os util common catalog transport gtest query + PUBLIC os util common catalog transport gtest query taos ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index f495451091..0493ddfe8c 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include #include #include #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -23,130 +23,65 @@ #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" -#include "taos.h" +#include "taos.h" #include "tdef.h" -#include "tvariant.h" -#include "catalog.h" - +#include "tvariant.h" +#include "catalog.h" +#include "tep.h" + +typedef struct SAppInstInfo { + int64_t numOfConns; + SCorEpSet mgmtEp; +} SAppInstInfo; + +typedef struct STscObj { + char user[TSDB_USER_LEN]; + char pass[TSDB_PASSWORD_LEN]; + char acctId[TSDB_ACCT_ID_LEN]; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + uint32_t connId; + uint64_t id; // ref ID returned by taosAddRef +// struct SSqlObj *sqlList; + void *pTransporter; + pthread_mutex_t mutex; // used to protect the operation on db + int32_t numOfReqs; // number of sqlObj from this tscObj + SAppInstInfo *pAppInfo; +} STscObj; + namespace { - - + } - -TEST(testCase, normalCase) { - char *clusterId = "cluster1"; - struct SCatalog* pCtg = NULL; - - int32_t code = catalogInit(NULL); - ASSERT_EQ(code, 0); - - code = catalogGetHandle(clusterId, &pCtg); - ASSERT_EQ(code, 0); - - -} - -/* -TEST(testCase, normalCase) { - SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); - ASSERT_EQ(info1.valid, true); - char msg[128] = {0}; - SMsgBuf buf; - buf.len = 128; - buf.buf = msg; +TEST(testCase, normalCase) { + STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + assert(pConn != NULL); - SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0); - int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); + char *clusterId = "cluster1"; + char *dbname = "db1"; + char *tablename = "table1"; + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + msgInit(); + + int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); - SCatalogReq req = {0}; - int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); - ASSERT_EQ(ret, 0); - ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); + code = catalogGetHandle(clusterId, &pCtg); + ASSERT_EQ(code, 0); - SQueryStmtInfo* pQueryInfo = createQueryInfo(); - setTableMetaInfo(pQueryInfo, &req); + code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo); + ASSERT_EQ(code, 0); - SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0); - ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); - ASSERT_EQ(ret, 0); - - SArray* pExprList = pQueryInfo->exprList[0]; - - int32_t num = tsCompatibleModel? 2:1; - ASSERT_EQ(taosArrayGetSize(pExprList), num); - - SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); - ASSERT_EQ(p1->base.pColumns->uid, 110); - ASSERT_EQ(p1->base.numOfParams, 1); - ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); - ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); - ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP); - ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); - ASSERT_EQ(p1->base.interBytes, 16); - - ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); - ASSERT_STREQ(p1->pExpr->_function.functionName, "top"); - - tExprNode* pParam = p1->pExpr->_function.pChild[0]; - - ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE); - ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); - ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); - - struct SQueryPlanNode* n = nullptr; - code = createQueryPlan(pQueryInfo, &n); - - char* str = NULL; - queryPlanToString(n, &str); - printf("%s\n", str); - - destroyQueryInfo(pQueryInfo); - qParserClearupMetaRequestInfo(&req); - destroySqlInfo(&info1); + taos_close(pConn); } -TEST(testCase, displayPlan) { - generateLogicplan("select count(*) from `t.1abc`"); - generateLogicplan("select count(*)+ 22 from `t.1abc`"); - generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30"); - generateLogicplan("select count(*) from `t.1abc` group by a"); - generateLogicplan("select count(A+B) from `t.1abc` group by a"); - generateLogicplan("select count(length(a)+b) from `t.1abc` group by a"); - generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)"); - generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc "); - generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`"); - generateLogicplan("select count(*), min(a) + 99 from `t.1abc`"); - generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`"); - generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20"); - generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)"); - generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)"); - // order by + group by column + limit offset - generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1"); - - // fill - generateLogicplan("select min(a) from `t.1abc` where ts>now and ts Date: Thu, 16 Dec 2021 17:23:17 +0800 Subject: [PATCH 2/8] modify scheduler api --- include/libs/scheduler/scheduler.h | 30 ++++------------- source/libs/scheduler/inc/schedulerInt.h | 16 ++++++++- source/libs/scheduler/src/scheduler.c | 43 +++++++++++++++++++++++- 3 files changed, 63 insertions(+), 26 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 6b3c9ed021..fe22d33086 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "planner.h" + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -43,43 +45,23 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status - SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage -} SQueryTask; - -typedef struct SQueryJob { - SArray **pSubtasks; - // todo -} SQueryJob; - /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. * @param pJob * @return */ -int32_t qProcessQueryJob(struct SQueryJob* pJob); +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); -/** - * The SSqlObj should not be here???? - * @param pSql - * @param pVgroupId - * @param pRetVgroupId - * @return - */ -//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId); /** * Cancel query job * @param pJob * @return */ -int32_t qKillQueryJob(struct SQueryJob* pJob); +int32_t scheduleCancelJob(void *pJob); #ifdef __cplusplus } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index b1b128e200..1648cbfc98 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,8 +31,22 @@ typedef struct SQuery { int32_t currentLevel; } SQuery; +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + char *pSubplan; // operator tree + uint64_t status; // task status + SQueryProfileSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + +typedef struct SQueryJob { + SArray **pSubtasks; + // todo +} SQueryJob; + #ifdef __cplusplus } #endif -#endif /*_TD_SCHEDULER_INT_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_INT_H_*/ diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37f6240f9b..66fd0aa4f3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -13,4 +13,45 @@ * along with this program. If not, see . */ -#include "schedulerInt.h" \ No newline at end of file +#include "schedulerInt.h" +#include "taosmsg.h" + + +int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { +/* + SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); + if (pRequest == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SRequestMsgBody body = {0}; + buildConnectMsg(pRequest, &body); + + int64_t transporterId = 0; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + + tsem_wait(&pRequest->body.rspSem); + destroyConnectMsg(&body); + + if (pRequest->code != TSDB_CODE_SUCCESS) { + const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); + printf("failed to connect to server, reason: %s\n\n", errorMsg); + + destroyRequest(pRequest); + taos_close(pTscObj); + pTscObj = NULL; + } else { + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); + destroyRequest(pRequest); + } +*/ +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); + +int32_t scheduleCancelJob(void *pJob); + + From 5e30641d6cf39d169fae9b98b1bf3e1b6feea577 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 18:50:21 +0800 Subject: [PATCH 3/8] ut test --- source/libs/catalog/src/catalog.c | 13 +++++++- source/libs/catalog/test/catalogTests.cpp | 40 ++++++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3aac02c1e4..248cdbe51c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -54,9 +54,20 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp return code; } + char *pMsg = rpcMallocCont(msgLen); + if (NULL == pMsg) { + ctgError("rpc malloc %d failed", msgLen); + tfree(msg); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(pMsg, msg, msgLen); + + tfree(msg); + SRpcMsg rpcMsg = { .msgType = TSDB_MSG_TYPE_USE_DB, - .pCont = msg, + .pCont = pMsg, .contLen = msgLen, }; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 0493ddfe8c..e14c58d412 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -28,6 +28,7 @@ #include "tvariant.h" #include "catalog.h" #include "tep.h" +#include "trpc.h" typedef struct SAppInstInfo { int64_t numOfConns; @@ -50,6 +51,41 @@ typedef struct STscObj { namespace { +void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { + SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); + strcpy(pReq->db, "1.db1"); + pReq->numOfVgroups = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRowsPerFileBlock = htonl(100); + pReq->maxRowsPerFileBlock = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replications = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->ignoreExist = 1; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDbMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp); + + ASSERT_EQ(rpcRsp.code, 0); +} + } TEST(testCase, normalCase) { @@ -57,13 +93,15 @@ TEST(testCase, normalCase) { assert(pConn != NULL); char *clusterId = "cluster1"; - char *dbname = "db1"; + char *dbname = "1.db1"; char *tablename = "table1"; struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; msgInit(); + + sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); From f0110af30ec8537bee226a1dff317122928d3d48 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Dec 2021 22:04:47 +0800 Subject: [PATCH 4/8] add index cache --- include/libs/index/index.h | 9 ++++ source/libs/index/inc/indexInt.h | 11 +++++ source/libs/index/src/index.c | 47 +++++++++++++++----- source/libs/index/src/index_fst.c | 2 +- source/libs/index/src/index_fst_automation.c | 11 ++++- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 1b74928568..0885ce151e 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -28,6 +28,15 @@ typedef struct SIndexOpts SIndexOpts; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SArray SIndexMultiTerm; +typedef enum { + ADD_VALUE, // add index colume value + DEL_VALUE, // delete index column value + UPDATE_VALUE, // update index column value + ADD_INDEX, // add index on specify column + DROP_INDEX, // drop existed index + DROP_SATBLE // drop stable +} SIndexColumnType; + typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType; /* diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a6862c05c8..f6ff9bc139 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -17,7 +17,10 @@ #define _TD_INDEX_INT_H_ #include "index.h" +#include "index_fst.h" #include "tlog.h" +#include "thash.h" +#include "taos.h" #ifdef USE_LUCENE #include @@ -32,12 +35,20 @@ struct SIndex { #ifdef USE_LUCENE index_t *index; #endif + void *cache; + void *tindex; + SHashObj *fieldObj; // + uint64_t suid; + int fieldId; + pthread_mutex_t mtx; }; struct SIndexOpts { #ifdef USE_LUCENE void *opts; #endif + int32_t numOfItermLimit; + int8_t mergeInterval; }; struct SIndexMultiTermQuery { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index f8f4311a4a..c011411189 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -15,39 +15,58 @@ #include "index.h" #include "indexInt.h" +#include "index_cache.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif -static pthread_once_t isInit = PTHREAD_ONCE_INIT; +typedef struct SIdxFieldInfo { + int id; // generated by index internal + int type; // field type +} SIdxFieldInfo; + +static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); +static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { + if (sIdx == NULL) { + return -1; + } + indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + return 0; +} SIndex *indexOpen(SIndexOpts *opts, const char *path) { pthread_once(&isInit, indexInit); + SIndex *sIdx = malloc(sizeof(SIndex)); + #ifdef USE_LUCENE index_t *index = index_open(path); - SIndex *p = malloc(sizeof(SIndex)); - p->index = index; - return p; + sIdx->index = index; #endif - return NULL; + + sIdx->cache = (void*)indexCacheCreate(); + sIdx->tindex = NULL; + sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pthread_mutex_init(&sIdx->mtx, NULL); + return sIdx; } -void indexClose(SIndex *index) { +void indexClose(SIndex *sIdx) { #ifdef USE_LUCENE - index_close(index->index); - index->index = NULL; + index_close(sIdex->index); + sIdx->index = NULL; #endif - free(index); + indexCacheDestroy(sIdx->cache); + taosHashCleanup(sIdx->fieldObj); + pthread_mutex_destroy(&sIdx->mtx); + free(sIdx); return; - } -#ifdef USE_LUCENE -#endif int indexPut(SIndex *index, SArray* field_vals, int uid) { + #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -63,6 +82,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { index_put(index->index, doc); index_document_destroy(doc); #endif + pthread_mutex_lock(&index->mtx); + pthread_mutex_unlock(&index->mtx); return 1; } @@ -105,7 +126,9 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result return 1; } + int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { + return 1; } int indexRebuild(SIndex *index, SIndexOpts *opts); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 9cb4ac6836..7aaa498864 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1330,7 +1330,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb SArray *nodes = taosArrayInit(8, sizeof(FstNode *)); while (taosArrayGetSize(sws->stack) > 0) { StreamState *p = (StreamState *)taosArrayPop(sws->stack); - if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) { + if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index f70b90041b..07ad45079b 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -87,9 +87,18 @@ static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) { if (ssv == NULL || ctx == NULL) {return NULL;} char *data = ctx->data; + if (ssv->kind == Done) { + return startWithStateValueCreate(Done, FST_INT, &ssv->val); + } if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { int val = ssv->val + 1; - return startWithStateValueCreate(Running, FST_INT, &val); + StartWithStateValue *nsv = startWithStateValueCreate(Running, FST_INT, &val); + if (prefixIsMatch(ctx, nsv)) { + nsv->kind = Done; + } else { + nsv->kind = Running; + } + return nsv; } return NULL; } From 7daca7ff81d02dc4fb0786fb30ed399b3fd9ae1f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Dec 2021 22:07:05 +0800 Subject: [PATCH 5/8] add index cache --- source/libs/index/inc/index_cache.h | 53 +++++++++++++ source/libs/index/src/index_cache.c | 119 ++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 source/libs/index/inc/index_cache.h create mode 100644 source/libs/index/src/index_cache.c diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h new file mode 100644 index 0000000000..27e095ff31 --- /dev/null +++ b/source/libs/index/inc/index_cache.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __INDEX_CACHE_H__ +#define __INDEX_CACHE_H__ + +#include "index.h" +#include "tlockfree.h" +// ----------------- row structure in skiplist --------------------- + +/* A data row, the format is like below: + * |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->| + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct IndexCache { + T_REF_DECLARE() + int cVersion; // +} IndexCache; + + +// +IndexCache *indexCacheCreate(); + +void indexCacheDestroy(IndexCache *cache); + +int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType); + +int indexCacheGet(IndexCache *cache, uint64_t *rst); +int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result); + +#ifdef __cplusplus +} +#endif + + + +#endif diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c new file mode 100644 index 0000000000..7c355b0f0a --- /dev/null +++ b/source/libs/index/src/index_cache.c @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index_cache.h" + +static int32_t compareKey(const void *l, const void *r) { + char *lp = (char *)l; + char *rp = (char *)r; + + // skip total len + int32_t ll, rl; // len + memcpy(&ll, lp, sizeof(int32_t)); + memcpy(&rl, rp, sizeof(int32_t)); + lp += sizeof(int32_t); + rp += sizeof(int32_t); + + // compare field id + int32_t lf, rf; // field id + memcpy(&lf, lp, sizeof(lf)); + memcpy(&rf, rp, sizeof(rf)); + if (lf != rf) { + return lf < rf ? -1: 1; + } + lp += sizeof(lf); + rp += sizeof(rf); + + // compare field value + int32_t lfl, rfl; + memcpy(&lfl, lp, sizeof(lfl)); + memcpy(&rfl, rp, sizeof(rfl)); + lp += sizeof(lfl); + rp += sizeof(rfl); + + //refator later + int32_t i, j; + for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { + if (lp[i] == rp[j]) { continue; } + else { return lp[i] < rp[j] ? -1 : 1;} + } + if (i < lfl) { return 1;} + else if (j < rfl) { return -1; } + lp += lfl; + rp += rfl; + + // compare version + int32_t lv, rv; + memcpy(&lv, lp, sizeof(lv)); + memcpy(&rv, rp, sizeof(rv)); + if (lv != rv) { + return lv > rv ? -1 : 1; + } + lp += sizeof(lv); + rp += sizeof(rv); + + + return 0; + +} +IndexCache *indexCacheCreate() { + IndexCache *cache = calloc(1, sizeof(IndexCache)); + return cache; +} + +void indexCacheDestroy(IndexCache *cache) { + free(cache); +} + +int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { + if (cache == NULL) { return -1;} + int32_t version = T_REF_INC(cache); + + int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType); + + char *buf = calloc(1, total); + char *p = buf; + + memcpy(buf, &total, sizeof(total)); + total += total; + + memcpy(buf, &fieldId, sizeof(fieldId)); + buf += sizeof(fieldId); + + memcpy(buf, &fvlen, sizeof(fvlen)); + buf += sizeof(fvlen); + memcpy(buf, fieldValue, fvlen); + buf += fvlen; + + memcpy(buf, &version, sizeof(version)); + buf += sizeof(version); + + memcpy(buf, &uid, sizeof(uid)); + buf += sizeof(uid); + + memcpy(buf, &operType, sizeof(operType)); + buf += sizeof(operType); + + +} +int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) { + + return 0; +} + + + + + From 2dda97074ab2ce3b81a6ac961987446af713af98 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 09:49:25 +0800 Subject: [PATCH 6/8] fix catalog bug and modify scheduler api --- include/libs/catalog/catalog.h | 81 +++++++++++++++------ include/libs/query/query.h | 12 ++++ include/libs/scheduler/scheduler.h | 4 ++ include/util/taoserror.h | 1 + source/libs/query/inc/queryInt.h | 11 --- source/libs/query/src/querymsg.c | 2 +- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schedulerInt.h | 52 ++++++++++---- source/libs/scheduler/src/scheduler.c | 89 +++++++++++++++++++++++- source/util/src/terror.c | 1 + 10 files changed, 207 insertions(+), 48 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 449064c8c6..ee626865fb 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -32,8 +32,7 @@ extern "C" { struct SCatalog; typedef struct SCatalogReq { - char dbName[TSDB_DB_NAME_LEN]; - SArray *pTableName; // table full name + SArray *pTableName; // element is SNAME SArray *pUdf; // udf name bool qNodeRequired; // valid qnode } SCatalogReq; @@ -54,10 +53,10 @@ typedef struct SCatalogCfg { int32_t catalogInit(SCatalogCfg *cfg); /** - * Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side. - * There is ONLY one SCatalog object for one process space, and this function returns a singleton. - * @param clusterId - * @return + * Get a cluster's catalog handle for all later operations. + * @param clusterId (input, end with \0) + * @param catalogHandle (output, NO need to free it) + * @return error code */ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); @@ -65,36 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); - +/** + * Get a table's meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); + +/** + * Force renew a table's local cached meta data. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @return error code + */ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); + +/** + * Force renew a table's local cached meta data and get the new one. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @return error code + */ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); /** - * get table's vgroup list. - * @param clusterId - * @pVgroupList - array of SVgroupInfo - * @return + * Get a table's actual vgroup, for stable it's all possible vgroup list. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) + * @return error code */ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); /** - * get a table's dst vgroup from its name's hash value. - * @vgInfo - SVgroupInfo - * @return + * Get a table's vgroup from its name's hash value. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param pTableName (input, table name, NOT including db name) + * @param vgInfo (output, vgroup info) + * @return error code */ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo); /** - * Get the required meta data from mnode. - * Note that this is a synchronized API and is also thread-safety. - * @param pCatalog - * @param pMgmtEps - * @param pMetaReq - * @param pMetaData - * @return + * Get all meta data required in pReq. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pReq (input, reqest info) + * @param pRsp (output, response data) + * @return error code */ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); @@ -105,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S /** * Destroy catalog and relase all resources - * @param pCatalog */ void catalogDestroy(void); diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 060aef9d65..d92f7d4497 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -22,6 +22,7 @@ extern "C" { #include "tarray.h" #include "thash.h" +#include "tlog.h" typedef SVgroupListRspMsg SVgroupListInfo; @@ -88,6 +89,17 @@ extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, extern void msgInit(); +extern int32_t qDebugFlag; + +#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) +#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) + + #ifdef __cplusplus } #endif diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index fe22d33086..d73e388c20 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -22,6 +22,10 @@ extern "C" { #include "planner.h" +typedef struct SSchedulerCfg { + +} SSchedulerCfg; + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4f1ef7da7b..689d2676d1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -324,6 +324,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error") +#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input") // grant diff --git a/source/libs/query/inc/queryInt.h b/source/libs/query/inc/queryInt.h index f3204b3785..75c1e134cd 100644 --- a/source/libs/query/inc/queryInt.h +++ b/source/libs/query/inc/queryInt.h @@ -21,17 +21,6 @@ extern "C" { #endif -#include "tlog.h" - -extern int32_t qDebugFlag; - -#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0) #ifdef __cplusplus } diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 9d99b568a5..7f033c0fdf 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index fd00085381..6675b7f5ec 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner common + PRIVATE os util planner common query ) \ No newline at end of file diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 1648cbfc98..8e30ce1403 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -24,27 +24,55 @@ extern "C" { #include "tarray.h" #include "planner.h" #include "scheduler.h" +#include "thash.h" -typedef struct SQuery { - SArray **pSubquery; - int32_t numOfLevels; - int32_t currentLevel; -} SQuery; +#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 + +enum { + SCH_STATUS_NOT_START = 1, + SCH_STATUS_EXECUTING, + SCH_STATUS_SUCCEED, + SCH_STATUS_FAILED, + SCH_STATUS_CANCELLING, + SCH_STATUS_CANCELLED +}; + +typedef struct SSchedulerMgmt { + SHashObj *Jobs; // key: queryId, value: SQueryJob* +} SSchedulerMgmt; typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status + uint64_t taskId; // task id + char *pSubplan; // operator tree + int8_t status; // task status SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage } SQueryTask; +typedef struct SQueryLevel { + int8_t status; + int32_t taskNum; + + SArray *subTasks; // Element is SQueryTask + SArray *subPlans; // Element is SSubplan +} SQueryLevel; + typedef struct SQueryJob { - SArray **pSubtasks; - // todo + uint64_t queryId; + int32_t levelNum; + int32_t levelIdx; + int8_t status; + SQueryProfileSummary summary; + + SArray *levels; // Element is SQueryLevel, starting from 0. + SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; + +#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) +#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 66fd0aa4f3..8d2e1ed916 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -15,6 +15,9 @@ #include "schedulerInt.h" #include "taosmsg.h" +#include "query.h" + +SSchedulerMgmt schMgmt = {0}; int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { @@ -47,11 +50,95 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } +int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { + int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); + if (levelNum <= 0) { + qError("invalid level num:%d", levelNum); + return TSDB_CODE_QRY_INVALID_INPUT; + } -int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + if (NULL == job->levels) { + qError("taosArrayInit %d failed", levelNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + job->levelNum = levelNum; + job->levelIdx = levelNum - 1; + job->status = SCH_STATUS_NOT_START; + + job->subPlans = dag->pSubplans; + + SQueryLevel level = {0}; + SArray *levelPlans = NULL; + int32_t levelPlanNum = 0; + + for (int32_t i = 0; i < levelNum; ++i) { + levelPlans = taosArrayGetP(dag->pSubplans, i); + if (NULL == levelPlans) { + qError("no level plans for level %d", i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); + if (levelPlanNum <= 0) { + qError("invalid level plans number:%d, level:%d", levelPlanNum, i); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + for (int32_t n = 0; n < levelPlanNum; ++n) { + + } + } + + return TSDB_CODE_SUCCESS; +} + + + +int32_t schedulerInit(SSchedulerCfg *cfg) { + schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.Jobs) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { + if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + + SQueryJob *job = calloc(1, sizeof(SQueryJob)); + if (NULL == job) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + schValidateAndBuildJob(pDag, job); + + + + + + *(SQueryJob **)pJob = job; + + + + +} int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleCancelJob(void *pJob); +void schedulerDestroy(void) { + if (schMgmt.Jobs) { + taosHashCleanup(schMgmt.Jobs); //TBD + schMgmt.Jobs = NULL; + } +} + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 70a3dc622f..5518ec2a31 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") // grant From 48507aecc0c94226bdbe53050b63ba26547b6bcc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Dec 2021 11:22:31 +0800 Subject: [PATCH 7/8] [td-11818] support show databases; --- include/client/taos.h | 4 +- include/common/taosmsg.h | 5 +- include/common/tmsgtype.h | 2 +- include/os/os.h | 1 + include/os/osSysinfo.h | 12 ++- include/util/tmacro.h | 42 -------- source/client/inc/clientInt.h | 14 ++- source/client/src/clientImpl.c | 82 ++++++++++---- source/client/src/clientMain.c | 131 +++++++++++++++++++++-- source/client/src/clientMsgHandler.c | 79 +++++++++++--- source/client/src/tscEnv.c | 8 +- source/client/test/clientTests.cpp | 60 +++++++++-- source/dnode/mnode/impl/src/mndCluster.c | 8 +- source/dnode/mnode/impl/src/mndDb.c | 38 +++---- source/dnode/mnode/impl/src/mndDnode.c | 30 +++--- source/dnode/mnode/impl/src/mndFunc.c | 16 +-- source/dnode/mnode/impl/src/mndMnode.c | 18 ++-- source/dnode/mnode/impl/src/mndProfile.c | 74 ++++++------- source/dnode/mnode/impl/src/mndStb.c | 12 +-- source/dnode/mnode/impl/src/mndSync.c | 36 ------- source/dnode/mnode/impl/src/mndUser.c | 8 +- source/dnode/mnode/impl/src/mndVgroup.c | 16 +-- source/libs/parser/src/parser.c | 13 +-- source/libs/query/src/querymsg.c | 4 +- source/os/src/osSysinfo.c | 10 +- src/client/src/tscAsync.c | 6 +- src/client/src/tscLocal.c | 2 +- src/client/src/tscServer.c | 10 +- src/client/src/tscSql.c | 6 +- src/client/src/tscSubquery.c | 2 +- 30 files changed, 466 insertions(+), 283 deletions(-) delete mode 100644 include/util/tmacro.h delete mode 100644 source/dnode/mnode/impl/src/mndSync.c diff --git a/include/client/taos.h b/include/client/taos.h index eb3332ed18..7357478555 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -80,8 +80,8 @@ typedef enum { typedef struct taosField { char name[65]; - uint8_t type; - int16_t bytes; + int8_t type; + int32_t bytes; } TAOS_FIELD; #ifdef _TD_GO_DLL_ diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 9f5dc0c666..7abeb97b27 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -572,14 +572,13 @@ typedef struct { } SRetrieveTableMsg; typedef struct SRetrieveTableRsp { - int32_t numOfRows; - int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; - int8_t reserved; int32_t compLen; + + int32_t numOfRows; char data[]; } SRetrieveTableRsp; diff --git a/include/common/tmsgtype.h b/include/common/tmsgtype.h index d357ca1f47..1fb10ae15b 100644 --- a/include/common/tmsgtype.h +++ b/include/common/tmsgtype.h @@ -62,7 +62,7 @@ enum { TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_DNODE, "cfg-dnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_MNODE, "cfg-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW, "show" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE, "retrieve" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_MNODE, "retrieve" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" ) diff --git a/include/os/os.h b/include/os/os.h index 53a6cef96a..de2a8182db 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -46,6 +46,7 @@ extern "C" { #include #include #include +#include #include #include diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 56f6b3e0da..6952b91742 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "os.h" + #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 96 @@ -57,11 +59,11 @@ char * taosGetCmdlineByPID(int pid); void taosSetCoreDump(bool enable); typedef struct { - const char *sysname; - const char *nodename; - const char *release; - const char *version; - const char *machine; + char sysname[_UTSNAME_MACHINE_LENGTH]; + char nodename[_UTSNAME_MACHINE_LENGTH]; + char release[_UTSNAME_MACHINE_LENGTH]; + char version[_UTSNAME_MACHINE_LENGTH]; + char machine[_UTSNAME_MACHINE_LENGTH]; } SysNameInfo; SysNameInfo taosGetSysNameInfo(); diff --git a/include/util/tmacro.h b/include/util/tmacro.h deleted file mode 100644 index 5cca8a1062..0000000000 --- a/include/util/tmacro.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_UTIL_MACRO_H_ -#define _TD_UTIL_MACRO_H_ - -#include "os.h" - -#ifdef __cplusplus -extern "C" { -#endif - -// Module init/clear MACRO definitions -#define TD_MOD_UNINITIALIZED 0 -#define TD_MOD_INITIALIZED 1 - -#define TD_MOD_UNCLEARD 0 -#define TD_MOD_CLEARD 1 - -typedef int8_t td_mode_flag_t; - -#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED) - -#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD) - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_UTIL_MACRO_H_*/ \ No newline at end of file diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 87951d7944..9ef1d67e74 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,8 +20,8 @@ extern "C" { #endif -#include #include "taos.h" +#include "common.h" #include "taosmsg.h" #include "tdef.h" #include "tep.h" @@ -88,9 +88,15 @@ typedef struct STscObj { } STscObj; typedef struct SClientResultInfo { - SSDataBlock *pData; - TAOS_FIELD *resultFields; + const char *pMsg; + const char *pData; + TAOS_FIELD *fields; + int32_t numOfCols; + int32_t numOfRows; int32_t current; + int32_t *length; + TAOS_ROW row; + char **pCol; } SClientResultInfo; typedef struct SReqBody { @@ -98,6 +104,7 @@ typedef struct SReqBody { void* fp; void* param; int32_t paramLen; + int64_t execId; // showId/queryId SClientResultInfo* pResInfo; } SRequestBody; @@ -152,6 +159,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); void* doFetchRow(SRequestObj* pRequest); +void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index fb98a7da46..4815e862e8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -12,7 +12,7 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); -static void destroyConnectMsg(SRequestMsgBody* pMsgBody); +static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody); static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId); @@ -99,17 +99,19 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, char* key = getClusterKey(user, secretEncrypt, ip, port); - SAppInstInfo* pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL) { - pInst = calloc(1, sizeof(struct SAppInstInfo)); + SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); - pInst->mgmtEp = epSet; - pInst->pTransporter = openTransporter(user, secretEncrypt); + p->mgmtEp = epSet; + p->pTransporter = openTransporter(user, secretEncrypt); + taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); - taosHashPut(appInfo.pInstMap, key, strlen(key), &pInst, POINTER_BYTES); + pInst = &p; } - return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, pInst); + tfree(key); + return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst); } TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { @@ -163,7 +165,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); - destroyConnectMsg(&body); + destroyRequestMsgBody(&body); } else { assert(0); } @@ -234,7 +236,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); - destroyConnectMsg(&body); + destroyRequestMsgBody(&body); if (pRequest->code != TSDB_CODE_SUCCESS) { const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); @@ -281,7 +283,7 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) return 0; } -static void destroyConnectMsg(SRequestMsgBody* pMsgBody) { +static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { assert(pMsgBody != NULL); tfree(pMsgBody->pData); } @@ -337,7 +339,14 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType], tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); if (handleRequestRspFp[pRequest->type]) { - pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen); + char *p = malloc(pMsg->contLen); + if (p == NULL) { + pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + terrno = pRequest->code; + } else { + memcpy(p, pMsg->pCont, pMsg->contLen); + pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen); + } } } else { tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType], @@ -381,15 +390,48 @@ void* doFetchRow(SRequestObj* pRequest) { assert(pRequest != NULL); SClientResultInfo* pResultInfo = pRequest->body.pResInfo; - if (pResultInfo == NULL || pResultInfo->current >= pResultInfo->pData->info.rows) { - if (pResultInfo == NULL) { - pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); -// pRequest->body.pResInfo. + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { + pRequest->type = TSDB_SQL_RETRIEVE_MNODE; + + SRequestMsgBody body = {0}; + buildRequestMsgFp[pRequest->type](pRequest, &body); + + int64_t transporterId = 0; + STscObj* pTscObj = pRequest->pTscObj; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + + tsem_wait(&pRequest->body.rspSem); + destroyRequestMsgBody(&body); + + pResultInfo->current = 0; + if (pResultInfo->numOfRows <= pResultInfo->current) { + return NULL; } - // current data set are exhausted, fetch more result from node -// if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) { -// taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); -// tsem_wait(&pSql->rspSem); -// } + } + + for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) { + pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; + if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { + pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); + pResultInfo->row[i] = varDataVal(pResultInfo->row[i]); + } + } + + pResultInfo->current += 1; + return pResultInfo->row; +} + +void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { + assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); + if (numOfRows == 0) { + return; + } + + int32_t offset = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + pResultInfo->length[i] = pResultInfo->fields[i].bytes; + pResultInfo->row[i] = pResultInfo->pData + offset * pResultInfo->numOfRows; + pResultInfo->pCol[i] = pResultInfo->row[i]; + offset += pResultInfo->fields[i].bytes; } } \ No newline at end of file diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 5a1f55e573..e3ec7c27ee 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -28,7 +28,6 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { } int ret = taos_options_imp(option, (const char*)arg); - atomic_store_32(&lock, 0); return ret; } @@ -58,18 +57,18 @@ void taos_cleanup(void) { } TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { - int32_t p = (port != 0)? port:tsServerPort; + int32_t p = (port != 0) ? port : tsServerPort; - tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db); - if (user == NULL) { - user = TSDB_DEFAULT_USER; - } + tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db); + if (user == NULL) { + user = TSDB_DEFAULT_USER; + } - if (pass == NULL) { - pass = TSDB_DEFAULT_PASS; - } + if (pass == NULL) { + pass = TSDB_DEFAULT_PASS; + } - return taos_connect_internal(ip, user, pass, NULL, db, p); + return taos_connect_internal(ip, user, pass, NULL, db, p); } void taos_close(TAOS* taos) { @@ -110,6 +109,34 @@ void taos_free_result(TAOS_RES *res) { destroyRequest(pRequest); } +int taos_field_count(TAOS_RES *res) { + if (res == NULL) { + return 0; + } + + SRequestObj* pRequest = (SRequestObj*) res; + + SClientResultInfo* pResInfo = pRequest->body.pResInfo; + if (pResInfo == NULL) { + return 0; + } + + return pResInfo->numOfCols; +} + +int taos_num_fields(TAOS_RES *res) { + return taos_field_count(res); +} + +TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { + if (taos_num_fields(res) == 0) { + return NULL; + } + + SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo; + return pResInfo->fields; +} + TAOS_RES *taos_query(TAOS *taos, const char *sql) { if (taos == NULL || sql == NULL) { return NULL; @@ -131,3 +158,87 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { return doFetchRow(pRequest); } + +int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { + int32_t len = 0; + for (int i = 0; i < num_fields; ++i) { + if (i > 0) { + str[len++] = ' '; + } + + if (row[i] == NULL) { + len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR); + continue; + } + + switch (fields[i].type) { + case TSDB_DATA_TYPE_TINYINT: + len += sprintf(str + len, "%d", *((int8_t *)row[i])); + break; + + case TSDB_DATA_TYPE_UTINYINT: + len += sprintf(str + len, "%u", *((uint8_t *)row[i])); + break; + + case TSDB_DATA_TYPE_SMALLINT: + len += sprintf(str + len, "%d", *((int16_t *)row[i])); + break; + + case TSDB_DATA_TYPE_USMALLINT: + len += sprintf(str + len, "%u", *((uint16_t *)row[i])); + break; + + case TSDB_DATA_TYPE_INT: + len += sprintf(str + len, "%d", *((int32_t *)row[i])); + break; + + case TSDB_DATA_TYPE_UINT: + len += sprintf(str + len, "%u", *((uint32_t *)row[i])); + break; + + case TSDB_DATA_TYPE_BIGINT: + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); + break; + + case TSDB_DATA_TYPE_UBIGINT: + len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i])); + break; + + case TSDB_DATA_TYPE_FLOAT: { + float fv = 0; + fv = GET_FLOAT_VAL(row[i]); + len += sprintf(str + len, "%f", fv); + } break; + + case TSDB_DATA_TYPE_DOUBLE: { + double dv = 0; + dv = GET_DOUBLE_VAL(row[i]); + len += sprintf(str + len, "%lf", dv); + } break; + + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: { + int32_t charLen = varDataLen((char*)row[i] - VARSTR_HEADER_SIZE); + if (fields[i].type == TSDB_DATA_TYPE_BINARY) { + assert(charLen <= fields[i].bytes && charLen >= 0); + } else { + assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0); + } + + memcpy(str + len, row[i], charLen); + len += charLen; + } break; + + case TSDB_DATA_TYPE_TIMESTAMP: + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); + break; + + case TSDB_DATA_TYPE_BOOL: + len += sprintf(str + len, "%d", *((int8_t *)row[i])); + default: + break; + } + } + + return len; +} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index d860d56f75..646964e319 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -529,7 +529,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) { if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || - pCmd->command == TSDB_SQL_RETRIEVE || + pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || @@ -2700,7 +2700,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if ((pCmd->command == TSDB_SQL_RETRIEVE) || + if ((pCmd->command == TSDB_SQL_RETRIEVE_MNODE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) || (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && @@ -3158,6 +3158,9 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); + pRequest->body.pResInfo->pMsg = pMsg; + tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); return 0; } @@ -3217,26 +3220,70 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) SSchema* pSchema = pMetaMsg->pSchema; pMetaMsg->tuid = htobe64(pMetaMsg->tuid); for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { - pSchema->bytes = htons(pSchema->bytes); + pSchema->bytes = htonl(pSchema->bytes); pSchema++; } - STableMeta* pTableMeta = createTableMetaFromMsg(pMetaMsg); - SSchema *pTableSchema = pTableMeta->schema; - - TAOS_FIELD* pFields = calloc(1, pTableMeta->tableInfo.numOfColumns); - for (int16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i, ++pSchema) { - tstrncpy(pFields[i].name, pTableSchema[i].name, tListLen(pFields[i].name)); - pFields[i].type = pTableSchema[i].type; - pFields[i].bytes = pTableSchema[i].bytes; + pSchema = pMetaMsg->pSchema; + TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD)); + for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) { + tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name)); + pFields[i].type = pSchema[i].type; + pFields[i].bytes = pSchema[i].bytes; } -// pRequest->body.resultFields = pFields; -// pRequest->body.numOfFields = pTableMeta->tableInfo.numOfColumns; + if (pRequest->body.pResInfo == NULL) { + pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); + } + pRequest->body.pResInfo->pMsg = pMsg; + SClientResultInfo* pResInfo = pRequest->body.pResInfo; + + pResInfo->fields = pFields; + pResInfo->numOfCols = pMetaMsg->numOfColumns; + pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); + + pRequest->body.execId = pShow->showId; return 0; } +int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { + pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + pMsgBody->msgLen = sizeof(SRetrieveTableMsg); + pMsgBody->requestObjRefId = pRequest->self; + + SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); + pRetrieveMsg->showId = htonl(pRequest->body.execId); + + pMsgBody->pData = pRetrieveMsg; + return TSDB_CODE_SUCCESS; +} + +int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { + assert(msgLen >= sizeof(SRetrieveTableRsp)); + + tfree(pRequest->body.pResInfo->pMsg); + pRequest->body.pResInfo->pMsg = pMsg; + + SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; + pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); + pRetrieve->precision = htons(pRetrieve->precision); + + SClientResultInfo* pResInfo = pRequest->body.pResInfo; + pResInfo->numOfRows = pRetrieve->numOfRows; + pResInfo->pData = pRetrieve->data; // todo fix this in async model + + pResInfo->current = 0; + setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); + + tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, + pRetrieve->completed, pRequest->body.execId); + return 0; +} + + void initMsgHandleFp() { #if 0 tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; @@ -3273,7 +3320,7 @@ void initMsgHandleFp() { tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg; tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; @@ -3291,7 +3338,7 @@ void initMsgHandleFp() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; @@ -3321,4 +3368,6 @@ void initMsgHandleFp() { buildRequestMsgFp[TSDB_SQL_SHOW] = buildShowMsg; handleRequestRspFp[TSDB_SQL_SHOW] = processShowRsp; + buildRequestMsgFp[TSDB_SQL_RETRIEVE_MNODE] = buildRetrieveMnodeMsg; + handleRequestRspFp[TSDB_SQL_RETRIEVE_MNODE]= processRetrieveMnodeRsp; } \ No newline at end of file diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index e378ebede4..023bd6ebe9 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -127,8 +127,6 @@ void destroyTscObj(void *pObj) { atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); - - closeTransporter(pTscObj); pthread_mutex_destroy(&pTscObj->mutex); tfree(pTscObj); } @@ -190,6 +188,12 @@ static void doDestroyRequest(void* p) { tfree(pRequest->sqlstr); tfree(pRequest->pInfo); + if (pRequest->body.pResInfo != NULL) { + tfree(pRequest->body.pResInfo->pData); + tfree(pRequest->body.pResInfo->pMsg); + tfree(pRequest->body.pResInfo); + } + deregisterRequest(pRequest); tfree(pRequest); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 62d2cb5b18..46fd76234e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -23,6 +23,7 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" +#include "../inc/clientInt.h" #include "taos.h" namespace { @@ -34,19 +35,62 @@ int main(int argc, char** argv) { } TEST(testCase, driverInit_Test) { + taos_init(); +} + +TEST(testCase, connect_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + taos_close(pConn); +} + +TEST(testCase, create_user_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +//TEST(testCase, show_user_Test) { +// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "show users"); +// TAOS_ROW pRow = NULL; +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); // } // -// taos_free_result(pRes); +// taos_close(pConn); +//} - TAOS_RES* pRes = taos_query(pConn, "show users"); - TAOS_ROW pRow = taos_fetch_row(pRes); - assert(pRow != NULL); +TEST(testCase, show_db_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show databases"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } taos_close(pConn); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 0eaa184907..7c53a4ebd0 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -143,22 +143,22 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "create_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); strcpy(pMeta->tbFname, mndShowStr(pShow->type)); pShow->numOfColumns = cols; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 9f2b2d17f3..010b1abb78 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -709,100 +709,100 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "replica"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "quorum"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "days"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "keep0,keep1,keep2"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "cache(MB)"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "cache"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "blocks"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "minrows"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "maxrows"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 1; pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; strcpy(pSchema[cols].name, "wallevel"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "fsync"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 1; pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; strcpy(pSchema[cols].name, "comp"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 1; pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; strcpy(pSchema[cols].name, "cachelast"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "precision"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 1; pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; strcpy(pSchema[cols].name, "update"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ec5f68a713..32c1689589 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -552,16 +552,16 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; tstrncpy(pSchema[cols].name, "name", sizeof(pSchema[cols].name)); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; tstrncpy(pSchema[cols].name, "value", sizeof(pSchema[cols].name)); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; @@ -629,46 +629,46 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "end point"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "endpoint"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "vnodes"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "max vnodes"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "max_vnodes"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "status"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "offline reason"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "offline_reason"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index b7158bb094..a513c9d57d 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -380,46 +380,46 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "comment"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "aggregate"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "outputtype"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "create_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "code_len"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "bufsize"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7b6804d43e..16733cbf94 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -368,34 +368,34 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "end point"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "endpoint"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "role"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "role time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "role_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 45a63f2dc5..bf657fd27a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -474,48 +474,48 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "connId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; // app name pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "program"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; // app pid pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "pid"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "login_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "last_access"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; @@ -602,88 +602,88 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "queryId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "connId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 24; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "qid"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "created_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; strcpy(pSchema[cols].name, "time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "sql_obj_id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "pid"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ep"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 1; pSchema[cols].type = TSDB_DATA_TYPE_BOOL; strcpy(pSchema[cols].name, "stable_query"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "sub_queries"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "sub_query_info"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "sql"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; @@ -818,64 +818,64 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "streamId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "connId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "dest table"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "destination"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "exec time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "exec"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; strcpy(pSchema[cols].name, "time(us)"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "sql"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "cycles"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 791b6f5d12..e3b64b8b5c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -570,28 +570,28 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c deleted file mode 100644 index 59161b32f2..0000000000 --- a/source/dnode/mnode/impl/src/mndSync.c +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" -#include "mndTrans.h" - -int32_t mndInitSync(SMnode *pMnode) { return 0; } -void mndCleanupSync(SMnode *pMnode) {} - -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { - int32_t code = 0; - - // int32_t len = sdbGetRawTotalSize(pRaw); - // SSdbRaw *pReceived = calloc(1, len); - // memcpy(pReceived, pRaw, len); - // mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); - - // mndTransApply(pMnode, pReceived, code); - return code; -} - -bool mndIsMaster(SMnode *pMnode) { return true; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 5fa4fe5359..c97e1ff7d6 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -410,25 +410,25 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "privilege"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "account"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pMeta->numOfColumns = htonl(cols); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 9a462c024b..fd711f7109 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -203,30 +203,30 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "vgId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "tables"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; for (int32_t i = 0; i < pShow->replica; ++i) { pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; } - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; @@ -309,16 +309,16 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "vgId"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "status"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 1d883974b8..5e08859a66 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -32,12 +32,6 @@ bool qIsInsertSql(const char* pStr, size_t length) { } int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen) { - SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); - if (pQueryInfo == NULL) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. - return terrno; - } - SSqlInfo info = doGenerateAST(pStr); if (!info.valid) { strncpy(msg, info.msg, msgLen); @@ -51,6 +45,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ // do nothing } } else { + SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); + if (pQueryInfo == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. + return terrno; + } + struct SCatalog* pCatalog = NULL; int32_t code = catalogGetHandle(NULL, &pCatalog); code = qParserValidateSqlNode(pCatalog, &info, pQueryInfo, id, msg, msgLen); @@ -59,6 +59,7 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ } } + destroySqlInfo(&info); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 9d99b568a5..5b26d5bd02 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -305,7 +305,7 @@ void msgInit() { tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg; tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; @@ -323,7 +323,7 @@ void msgInit() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index f892b4d8c0..cb231e15a0 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -1120,11 +1120,11 @@ SysNameInfo taosGetSysNameInfo() { struct utsname uts; if (!uname(&uts)) { - info.sysname = strdup(uts.sysname); - info.nodename = strdup(uts.nodename); - info.release = strdup(uts.release); - info.version = strdup(uts.version); - info.machine = strdup(uts.machine); + tstrncpy(info.sysname, uts.sysname, sizeof(info.sysname)); + tstrncpy(info.nodename, uts.nodename, sizeof(info.nodename)); + tstrncpy(info.release, uts.release, sizeof(info.release)); + tstrncpy(info.version, uts.version, sizeof(info.version)); + tstrncpy(info.machine, uts.machine, sizeof(info.machine)); } return info; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 4a621d47c0..b2f8eda474 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -183,7 +183,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pSql->fp = fp; if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH; } if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { @@ -265,7 +265,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { } return; - } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) { + } else if (pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) { // in case of show command, return no data (*pSql->fetchFp)(param, pSql, 0); } else { @@ -273,7 +273,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { } } else { // current query is not completed, continue retrieve from node if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH; } SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 0b4f399a1f..dcfb2d6a87 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -322,7 +322,7 @@ TAOS_ROW tscFetchRow(void *param) { // current data set are exhausted, fetch more data from node if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && - (pCmd->command == TSDB_SQL_RETRIEVE || + (pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index caa334aaed..7a754ee698 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -539,7 +539,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) { if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || - pCmd->command == TSDB_SQL_RETRIEVE || + pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || @@ -2749,7 +2749,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { } STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if ((pCmd->command == TSDB_SQL_RETRIEVE) || + if ((pCmd->command == TSDB_SQL_RETRIEVE_MNODE) || ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) || (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && @@ -3174,7 +3174,7 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg; - tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg; + tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg; tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg; tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg; @@ -3192,7 +3192,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp; tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp; @@ -3214,7 +3214,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; tscKeepConn[TSDB_SQL_SHOW] = 1; - tscKeepConn[TSDB_SQL_RETRIEVE] = 1; + tscKeepConn[TSDB_SQL_RETRIEVE_MNODE] = 1; tscKeepConn[TSDB_SQL_SELECT] = 1; tscKeepConn[TSDB_SQL_FETCH] = 1; tscKeepConn[TSDB_SQL_HB] = 1; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5fdaad0d66..ab1fffd5a2 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -460,7 +460,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) { SSqlCmd *pCmd = &pSql->cmd; return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && - (pCmd->command == TSDB_SQL_RETRIEVE || + (pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || @@ -582,10 +582,10 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) && (cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_SHOW || - cmd == TSDB_SQL_RETRIEVE || + cmd == TSDB_SQL_RETRIEVE_MNODE || cmd == TSDB_SQL_FETCH)) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH; tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]); tscBuildAndSendRequest(pSql, NULL); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 275042a238..24a6377a73 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1670,7 +1670,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSql1->fp = joinRetrieveFinalResCallback; if (pCmd1->command < TSDB_SQL_LOCAL) { - pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH; } tscBuildAndSendRequest(pSql1, NULL); From 8f7a109851dc5ea5fa27c3aed67d1e75192660ef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Dec 2021 11:48:23 +0800 Subject: [PATCH 8/8] [td-11818] add new api implementation. --- source/client/src/clientMain.c | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e3ec7c27ee..21e632db8d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -242,3 +242,28 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) return len; } + +int* taos_fetch_lengths(TAOS_RES *res) { + if (res == NULL) { + return NULL; + } + + return ((SRequestObj*) res)->body.pResInfo->length; +} + +const char *taos_data_type(int type) { + switch (type) { + case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL"; + case TSDB_DATA_TYPE_BOOL: return "TSDB_DATA_TYPE_BOOL"; + case TSDB_DATA_TYPE_TINYINT: return "TSDB_DATA_TYPE_TINYINT"; + case TSDB_DATA_TYPE_SMALLINT: return "TSDB_DATA_TYPE_SMALLINT"; + case TSDB_DATA_TYPE_INT: return "TSDB_DATA_TYPE_INT"; + case TSDB_DATA_TYPE_BIGINT: return "TSDB_DATA_TYPE_BIGINT"; + case TSDB_DATA_TYPE_FLOAT: return "TSDB_DATA_TYPE_FLOAT"; + case TSDB_DATA_TYPE_DOUBLE: return "TSDB_DATA_TYPE_DOUBLE"; + case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_BINARY"; + case TSDB_DATA_TYPE_TIMESTAMP: return "TSDB_DATA_TYPE_TIMESTAMP"; + case TSDB_DATA_TYPE_NCHAR: return "TSDB_DATA_TYPE_NCHAR"; + default: return "UNKNOWN"; + } +}