Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
77069e5299
|
@ -80,8 +80,8 @@ typedef enum {
|
|||
|
||||
typedef struct taosField {
|
||||
char name[65];
|
||||
uint8_t type;
|
||||
int16_t bytes;
|
||||
int8_t type;
|
||||
int32_t bytes;
|
||||
} TAOS_FIELD;
|
||||
|
||||
#ifdef _TD_GO_DLL_
|
||||
|
|
|
@ -571,14 +571,13 @@ typedef struct {
|
|||
} SRetrieveTableMsg;
|
||||
|
||||
typedef struct SRetrieveTableRsp {
|
||||
int32_t numOfRows;
|
||||
int64_t offset; // updated offset value for multi-vnode projection query
|
||||
int64_t useconds;
|
||||
int8_t completed; // all results are returned to client
|
||||
int8_t precision;
|
||||
int8_t compressed;
|
||||
int8_t reserved;
|
||||
int32_t compLen;
|
||||
|
||||
int32_t numOfRows;
|
||||
char data[];
|
||||
} SRetrieveTableRsp;
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ enum {
|
|||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_DNODE, "cfg-dnode" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_MNODE, "cfg-mnode" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW, "show" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE, "retrieve" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_MNODE, "retrieve" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" )
|
||||
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" )
|
||||
|
|
|
@ -32,8 +32,7 @@ extern "C" {
|
|||
struct SCatalog;
|
||||
|
||||
typedef struct SCatalogReq {
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
SArray *pTableName; // table full name
|
||||
SArray *pTableName; // element is SNAME
|
||||
SArray *pUdf; // udf name
|
||||
bool qNodeRequired; // valid qnode
|
||||
} SCatalogReq;
|
||||
|
@ -54,10 +53,10 @@ typedef struct SCatalogCfg {
|
|||
int32_t catalogInit(SCatalogCfg *cfg);
|
||||
|
||||
/**
|
||||
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
|
||||
* There is ONLY one SCatalog object for one process space, and this function returns a singleton.
|
||||
* @param clusterId
|
||||
* @return
|
||||
* Get a cluster's catalog handle for all later operations.
|
||||
* @param clusterId (input, end with \0)
|
||||
* @param catalogHandle (output, NO need to free it)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
||||
|
||||
|
@ -65,29 +64,75 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
|||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
|
||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
|
||||
/**
|
||||
* Get a table's meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pDBName (input, full db name)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pDBName (input, full db name)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data and get the new one.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pDBName (input, full db name)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
* get table's vgroup list.
|
||||
* @param clusterId
|
||||
* @pVgroupList - array of SVgroupInfo
|
||||
* @return
|
||||
* Get a table's actual vgroup, for stable it's all possible vgroup list.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pDBName (input, full db name)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
|
||||
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
|
||||
|
||||
/**
|
||||
* Get a table's vgroup from its name's hash value.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pDBName (input, full db name)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param vgInfo (output, vgroup info)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
|
||||
|
||||
|
||||
/**
|
||||
* Get the required meta data from mnode.
|
||||
* Note that this is a synchronized API and is also thread-safety.
|
||||
* @param pCatalog
|
||||
* @param pMgmtEps
|
||||
* @param pMetaReq
|
||||
* @param pMetaData
|
||||
* @return
|
||||
* Get all meta data required in pReq.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pReq (input, reqest info)
|
||||
* @param pRsp (output, response data)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
|
||||
|
||||
|
@ -98,7 +143,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, S
|
|||
|
||||
/**
|
||||
* Destroy catalog and relase all resources
|
||||
* @param pCatalog
|
||||
*/
|
||||
void catalogDestroy(void);
|
||||
|
||||
|
|
|
@ -28,6 +28,15 @@ typedef struct SIndexOpts SIndexOpts;
|
|||
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
|
||||
typedef struct SArray SIndexMultiTerm;
|
||||
|
||||
typedef enum {
|
||||
ADD_VALUE, // add index colume value
|
||||
DEL_VALUE, // delete index column value
|
||||
UPDATE_VALUE, // update index column value
|
||||
ADD_INDEX, // add index on specify column
|
||||
DROP_INDEX, // drop existed index
|
||||
DROP_SATBLE // drop stable
|
||||
} SIndexColumnType;
|
||||
|
||||
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
|
||||
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType;
|
||||
/*
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "tarray.h"
|
||||
#include "thash.h"
|
||||
#include "tlog.h"
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
|
@ -83,6 +84,19 @@ typedef struct STableMetaOutput {
|
|||
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
||||
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
|
||||
|
||||
extern void msgInit();
|
||||
|
||||
|
||||
extern int32_t qDebugFlag;
|
||||
|
||||
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -20,6 +20,12 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "planner.h"
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
|
||||
} SSchedulerCfg;
|
||||
|
||||
typedef struct SQueryProfileSummary {
|
||||
int64_t startTs; // Object created and added into the message queue
|
||||
int64_t endTs; // the timestamp when the task is completed
|
||||
|
@ -43,43 +49,23 @@ typedef struct SQueryProfileSummary {
|
|||
uint64_t resultSize; // generated result size in Kb.
|
||||
} SQueryProfileSummary;
|
||||
|
||||
typedef struct SQueryTask {
|
||||
uint64_t queryId; // query id
|
||||
uint64_t taskId; // task id
|
||||
char *pSubplan; // operator tree
|
||||
uint64_t status; // task status
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
|
||||
} SQueryTask;
|
||||
|
||||
typedef struct SQueryJob {
|
||||
SArray **pSubtasks;
|
||||
// todo
|
||||
} SQueryJob;
|
||||
|
||||
/**
|
||||
* Process the query job, generated according to the query physical plan.
|
||||
* This is a synchronized API, and is also thread-safety.
|
||||
* @param pJob
|
||||
* @return
|
||||
*/
|
||||
int32_t qProcessQueryJob(struct SQueryJob* pJob);
|
||||
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob);
|
||||
|
||||
int32_t scheduleFetchRows(void *pJob, void *data);
|
||||
|
||||
/**
|
||||
* The SSqlObj should not be here????
|
||||
* @param pSql
|
||||
* @param pVgroupId
|
||||
* @param pRetVgroupId
|
||||
* @return
|
||||
*/
|
||||
//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId);
|
||||
|
||||
/**
|
||||
* Cancel query job
|
||||
* @param pJob
|
||||
* @return
|
||||
*/
|
||||
int32_t qKillQueryJob(struct SQueryJob* pJob);
|
||||
int32_t scheduleCancelJob(void *pJob);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ extern "C" {
|
|||
#include <math.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/utsname.h>
|
||||
#include <dirent.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#define TSDB_LOCALE_LEN 64
|
||||
#define TSDB_TIMEZONE_LEN 96
|
||||
|
||||
|
@ -57,11 +59,11 @@ char * taosGetCmdlineByPID(int pid);
|
|||
void taosSetCoreDump(bool enable);
|
||||
|
||||
typedef struct {
|
||||
const char *sysname;
|
||||
const char *nodename;
|
||||
const char *release;
|
||||
const char *version;
|
||||
const char *machine;
|
||||
char sysname[_UTSNAME_MACHINE_LENGTH];
|
||||
char nodename[_UTSNAME_MACHINE_LENGTH];
|
||||
char release[_UTSNAME_MACHINE_LENGTH];
|
||||
char version[_UTSNAME_MACHINE_LENGTH];
|
||||
char machine[_UTSNAME_MACHINE_LENGTH];
|
||||
} SysNameInfo;
|
||||
|
||||
SysNameInfo taosGetSysNameInfo();
|
||||
|
|
|
@ -308,6 +308,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
|
||||
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition")
|
||||
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error")
|
||||
#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input")
|
||||
|
||||
|
||||
// grant
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_MACRO_H_
|
||||
#define _TD_UTIL_MACRO_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// Module init/clear MACRO definitions
|
||||
#define TD_MOD_UNINITIALIZED 0
|
||||
#define TD_MOD_INITIALIZED 1
|
||||
|
||||
#define TD_MOD_UNCLEARD 0
|
||||
#define TD_MOD_CLEARD 1
|
||||
|
||||
typedef int8_t td_mode_flag_t;
|
||||
|
||||
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
|
||||
|
||||
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_MACRO_H_*/
|
|
@ -20,8 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <common.h>
|
||||
#include "taos.h"
|
||||
#include "common.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tdef.h"
|
||||
#include "tep.h"
|
||||
|
@ -88,9 +88,15 @@ typedef struct STscObj {
|
|||
} STscObj;
|
||||
|
||||
typedef struct SClientResultInfo {
|
||||
SSDataBlock *pData;
|
||||
TAOS_FIELD *resultFields;
|
||||
const char *pMsg;
|
||||
const char *pData;
|
||||
TAOS_FIELD *fields;
|
||||
int32_t numOfCols;
|
||||
int32_t numOfRows;
|
||||
int32_t current;
|
||||
int32_t *length;
|
||||
TAOS_ROW row;
|
||||
char **pCol;
|
||||
} SClientResultInfo;
|
||||
|
||||
typedef struct SReqBody {
|
||||
|
@ -98,6 +104,7 @@ typedef struct SReqBody {
|
|||
void* fp;
|
||||
void* param;
|
||||
int32_t paramLen;
|
||||
int64_t execId; // showId/queryId
|
||||
SClientResultInfo* pResInfo;
|
||||
} SRequestBody;
|
||||
|
||||
|
@ -152,6 +159,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
|
||||
|
||||
void* doFetchRow(SRequestObj* pRequest);
|
||||
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
||||
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
|
||||
static void destroyConnectMsg(SRequestMsgBody* pMsgBody);
|
||||
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody);
|
||||
|
||||
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
|
||||
|
||||
|
@ -99,17 +99,19 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
|
||||
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
||||
|
||||
SAppInstInfo* pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
||||
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
||||
if (pInst == NULL) {
|
||||
pInst = calloc(1, sizeof(struct SAppInstInfo));
|
||||
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
|
||||
|
||||
pInst->mgmtEp = epSet;
|
||||
pInst->pTransporter = openTransporter(user, secretEncrypt);
|
||||
p->mgmtEp = epSet;
|
||||
p->pTransporter = openTransporter(user, secretEncrypt);
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &pInst, POINTER_BYTES);
|
||||
pInst = &p;
|
||||
}
|
||||
|
||||
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, pInst);
|
||||
tfree(key);
|
||||
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
|
||||
}
|
||||
|
||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||
|
@ -163,7 +165,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyConnectMsg(&body);
|
||||
destroyRequestMsgBody(&body);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
|
@ -234,7 +236,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyConnectMsg(&body);
|
||||
destroyRequestMsgBody(&body);
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
|
||||
|
@ -281,7 +283,7 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void destroyConnectMsg(SRequestMsgBody* pMsgBody) {
|
||||
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
tfree(pMsgBody->pData);
|
||||
}
|
||||
|
@ -337,7 +339,14 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
|
||||
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
|
||||
if (handleRequestRspFp[pRequest->type]) {
|
||||
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen);
|
||||
char *p = malloc(pMsg->contLen);
|
||||
if (p == NULL) {
|
||||
pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
terrno = pRequest->code;
|
||||
} else {
|
||||
memcpy(p, pMsg->pCont, pMsg->contLen);
|
||||
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType],
|
||||
|
@ -381,15 +390,48 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
assert(pRequest != NULL);
|
||||
SClientResultInfo* pResultInfo = pRequest->body.pResInfo;
|
||||
|
||||
if (pResultInfo == NULL || pResultInfo->current >= pResultInfo->pData->info.rows) {
|
||||
if (pResultInfo == NULL) {
|
||||
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
|
||||
// pRequest->body.pResInfo.
|
||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||
pRequest->type = TSDB_SQL_RETRIEVE_MNODE;
|
||||
|
||||
SRequestMsgBody body = {0};
|
||||
buildRequestMsgFp[pRequest->type](pRequest, &body);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyRequestMsgBody(&body);
|
||||
|
||||
pResultInfo->current = 0;
|
||||
if (pResultInfo->numOfRows <= pResultInfo->current) {
|
||||
return NULL;
|
||||
}
|
||||
// current data set are exhausted, fetch more result from node
|
||||
// if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) {
|
||||
// taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
|
||||
// tsem_wait(&pSql->rspSem);
|
||||
// }
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
|
||||
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
|
||||
pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pResultInfo->current += 1;
|
||||
return pResultInfo->row;
|
||||
}
|
||||
|
||||
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
|
||||
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
|
||||
if (numOfRows == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pResultInfo->length[i] = pResultInfo->fields[i].bytes;
|
||||
pResultInfo->row[i] = pResultInfo->pData + offset * pResultInfo->numOfRows;
|
||||
pResultInfo->pCol[i] = pResultInfo->row[i];
|
||||
offset += pResultInfo->fields[i].bytes;
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
|
|||
}
|
||||
|
||||
int ret = taos_options_imp(option, (const char*)arg);
|
||||
|
||||
atomic_store_32(&lock, 0);
|
||||
return ret;
|
||||
}
|
||||
|
@ -58,7 +57,7 @@ void taos_cleanup(void) {
|
|||
}
|
||||
|
||||
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
||||
int32_t p = (port != 0)? port:tsServerPort;
|
||||
int32_t p = (port != 0) ? port : tsServerPort;
|
||||
|
||||
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db);
|
||||
if (user == NULL) {
|
||||
|
@ -110,6 +109,34 @@ void taos_free_result(TAOS_RES *res) {
|
|||
destroyRequest(pRequest);
|
||||
}
|
||||
|
||||
int taos_field_count(TAOS_RES *res) {
|
||||
if (res == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*) res;
|
||||
|
||||
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
|
||||
if (pResInfo == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return pResInfo->numOfCols;
|
||||
}
|
||||
|
||||
int taos_num_fields(TAOS_RES *res) {
|
||||
return taos_field_count(res);
|
||||
}
|
||||
|
||||
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||
if (taos_num_fields(res) == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo;
|
||||
return pResInfo->fields;
|
||||
}
|
||||
|
||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||
if (taos == NULL || sql == NULL) {
|
||||
return NULL;
|
||||
|
@ -131,3 +158,112 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
|||
|
||||
return doFetchRow(pRequest);
|
||||
}
|
||||
|
||||
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
|
||||
int32_t len = 0;
|
||||
for (int i = 0; i < num_fields; ++i) {
|
||||
if (i > 0) {
|
||||
str[len++] = ' ';
|
||||
}
|
||||
|
||||
if (row[i] == NULL) {
|
||||
len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (fields[i].type) {
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float fv = 0;
|
||||
fv = GET_FLOAT_VAL(row[i]);
|
||||
len += sprintf(str + len, "%f", fv);
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double dv = 0;
|
||||
dv = GET_DOUBLE_VAL(row[i]);
|
||||
len += sprintf(str + len, "%lf", dv);
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
int32_t charLen = varDataLen((char*)row[i] - VARSTR_HEADER_SIZE);
|
||||
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
|
||||
assert(charLen <= fields[i].bytes && charLen >= 0);
|
||||
} else {
|
||||
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
|
||||
}
|
||||
|
||||
memcpy(str + len, row[i], charLen);
|
||||
len += charLen;
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int* taos_fetch_lengths(TAOS_RES *res) {
|
||||
if (res == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return ((SRequestObj*) res)->body.pResInfo->length;
|
||||
}
|
||||
|
||||
const char *taos_data_type(int type) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL";
|
||||
case TSDB_DATA_TYPE_BOOL: return "TSDB_DATA_TYPE_BOOL";
|
||||
case TSDB_DATA_TYPE_TINYINT: return "TSDB_DATA_TYPE_TINYINT";
|
||||
case TSDB_DATA_TYPE_SMALLINT: return "TSDB_DATA_TYPE_SMALLINT";
|
||||
case TSDB_DATA_TYPE_INT: return "TSDB_DATA_TYPE_INT";
|
||||
case TSDB_DATA_TYPE_BIGINT: return "TSDB_DATA_TYPE_BIGINT";
|
||||
case TSDB_DATA_TYPE_FLOAT: return "TSDB_DATA_TYPE_FLOAT";
|
||||
case TSDB_DATA_TYPE_DOUBLE: return "TSDB_DATA_TYPE_DOUBLE";
|
||||
case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_BINARY";
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: return "TSDB_DATA_TYPE_TIMESTAMP";
|
||||
case TSDB_DATA_TYPE_NCHAR: return "TSDB_DATA_TYPE_NCHAR";
|
||||
default: return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -529,7 +529,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) {
|
|||
|
||||
if (pCmd->command == TSDB_SQL_SELECT ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_MNODE ||
|
||||
pCmd->command == TSDB_SQL_INSERT ||
|
||||
pCmd->command == TSDB_SQL_CONNECT ||
|
||||
pCmd->command == TSDB_SQL_HB ||
|
||||
|
@ -2700,7 +2700,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if ((pCmd->command == TSDB_SQL_RETRIEVE) ||
|
||||
if ((pCmd->command == TSDB_SQL_RETRIEVE_MNODE) ||
|
||||
((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) ||
|
||||
(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
|
@ -3158,6 +3158,9 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
|||
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
|
||||
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
|
||||
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
|
||||
pRequest->body.pResInfo->pMsg = pMsg;
|
||||
|
||||
tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns);
|
||||
return 0;
|
||||
}
|
||||
|
@ -3217,26 +3220,70 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
|
|||
SSchema* pSchema = pMetaMsg->pSchema;
|
||||
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
|
||||
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
||||
pSchema->bytes = htons(pSchema->bytes);
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
pSchema++;
|
||||
}
|
||||
|
||||
STableMeta* pTableMeta = createTableMetaFromMsg(pMetaMsg);
|
||||
SSchema *pTableSchema = pTableMeta->schema;
|
||||
|
||||
TAOS_FIELD* pFields = calloc(1, pTableMeta->tableInfo.numOfColumns);
|
||||
for (int16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i, ++pSchema) {
|
||||
tstrncpy(pFields[i].name, pTableSchema[i].name, tListLen(pFields[i].name));
|
||||
pFields[i].type = pTableSchema[i].type;
|
||||
pFields[i].bytes = pTableSchema[i].bytes;
|
||||
pSchema = pMetaMsg->pSchema;
|
||||
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
|
||||
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
||||
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
|
||||
pFields[i].type = pSchema[i].type;
|
||||
pFields[i].bytes = pSchema[i].bytes;
|
||||
}
|
||||
|
||||
// pRequest->body.resultFields = pFields;
|
||||
// pRequest->body.numOfFields = pTableMeta->tableInfo.numOfColumns;
|
||||
if (pRequest->body.pResInfo == NULL) {
|
||||
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
|
||||
}
|
||||
|
||||
pRequest->body.pResInfo->pMsg = pMsg;
|
||||
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
|
||||
|
||||
pResInfo->fields = pFields;
|
||||
pResInfo->numOfCols = pMetaMsg->numOfColumns;
|
||||
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
|
||||
|
||||
pRequest->body.execId = pShow->showId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
|
||||
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
||||
pMsgBody->msgLen = sizeof(SRetrieveTableMsg);
|
||||
pMsgBody->requestObjRefId = pRequest->self;
|
||||
|
||||
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
|
||||
pRetrieveMsg->showId = htonl(pRequest->body.execId);
|
||||
|
||||
pMsgBody->pData = pRetrieveMsg;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
|
||||
assert(msgLen >= sizeof(SRetrieveTableRsp));
|
||||
|
||||
tfree(pRequest->body.pResInfo->pMsg);
|
||||
pRequest->body.pResInfo->pMsg = pMsg;
|
||||
|
||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg;
|
||||
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
||||
pRetrieve->precision = htons(pRetrieve->precision);
|
||||
|
||||
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
|
||||
pResInfo->numOfRows = pRetrieve->numOfRows;
|
||||
pResInfo->pData = pRetrieve->data; // todo fix this in async model
|
||||
|
||||
pResInfo->current = 0;
|
||||
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||
|
||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
|
||||
pRetrieve->completed, pRequest->body.execId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void initMsgHandleFp() {
|
||||
#if 0
|
||||
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
||||
|
@ -3273,7 +3320,7 @@ void initMsgHandleFp() {
|
|||
|
||||
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
|
||||
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
|
||||
|
@ -3291,7 +3338,7 @@ void initMsgHandleFp() {
|
|||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
|
||||
|
@ -3321,4 +3368,6 @@ void initMsgHandleFp() {
|
|||
buildRequestMsgFp[TSDB_SQL_SHOW] = buildShowMsg;
|
||||
handleRequestRspFp[TSDB_SQL_SHOW] = processShowRsp;
|
||||
|
||||
buildRequestMsgFp[TSDB_SQL_RETRIEVE_MNODE] = buildRetrieveMnodeMsg;
|
||||
handleRequestRspFp[TSDB_SQL_RETRIEVE_MNODE]= processRetrieveMnodeRsp;
|
||||
}
|
|
@ -127,8 +127,6 @@ void destroyTscObj(void *pObj) {
|
|||
|
||||
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
|
||||
closeTransporter(pTscObj);
|
||||
pthread_mutex_destroy(&pTscObj->mutex);
|
||||
tfree(pTscObj);
|
||||
}
|
||||
|
@ -190,6 +188,12 @@ static void doDestroyRequest(void* p) {
|
|||
tfree(pRequest->sqlstr);
|
||||
tfree(pRequest->pInfo);
|
||||
|
||||
if (pRequest->body.pResInfo != NULL) {
|
||||
tfree(pRequest->body.pResInfo->pData);
|
||||
tfree(pRequest->body.pResInfo->pMsg);
|
||||
tfree(pRequest->body.pResInfo);
|
||||
}
|
||||
|
||||
deregisterRequest(pRequest);
|
||||
tfree(pRequest);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
|
||||
#include "../inc/clientInt.h"
|
||||
#include "taos.h"
|
||||
|
||||
namespace {
|
||||
|
@ -34,19 +35,62 @@ int main(int argc, char** argv) {
|
|||
}
|
||||
|
||||
TEST(testCase, driverInit_Test) {
|
||||
taos_init();
|
||||
}
|
||||
|
||||
TEST(testCase, connect_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, show_user_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
// TAOS_ROW pRow = NULL;
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
TAOS_ROW pRow = taos_fetch_row(pRes);
|
||||
assert(pRow != NULL);
|
||||
TEST(testCase, show_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
|
@ -143,19 +143,19 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -735,13 +735,13 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
|
|||
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
|
@ -753,85 +753,85 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
|
|||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "replica");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "quorum");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "days");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "keep0,keep1,keep2");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "cache(MB)");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "cache");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "blocks");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "minrows");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "maxrows");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "wallevel");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "fsync");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "comp");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "cachelast");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "precision");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "update");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -561,13 +561,13 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
|
|||
pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
tstrncpy(pSchema[cols].name, "name", sizeof(pSchema[cols].name));
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
tstrncpy(pSchema[cols].name, "value", sizeof(pSchema[cols].name));
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
@ -638,43 +638,43 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
|||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "end point");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "endpoint");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "vnodes");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "max vnodes");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "max_vnodes");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "status");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "offline reason");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "offline_reason");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -380,43 +380,43 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
|
|||
pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "comment");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "aggregate");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "outputtype");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "code_len");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "bufsize");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -368,31 +368,31 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
|||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "end point");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "endpoint");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "role");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "role time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "role_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -474,45 +474,45 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "connId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "user");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
// app name
|
||||
pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "program");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
// app pid
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "pid");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "login_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "last_access");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
@ -602,85 +602,85 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "queryId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "connId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "user");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 24;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "qid");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "created_time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(pSchema[cols].name, "time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sql_obj_id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "pid");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ep");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
|
||||
strcpy(pSchema[cols].name, "stable_query");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "sub_queries");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sub_query_info");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sql");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
@ -818,61 +818,61 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "streamId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "connId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "user");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "dest table");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "destination");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "created time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "exec time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "exec");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(pSchema[cols].name, "time(us)");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sql");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "cycles");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -590,25 +590,25 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM
|
|||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "columns");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "tags");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "mndInt.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
int32_t mndInitSync(SMnode *pMnode) { return 0; }
|
||||
void mndCleanupSync(SMnode *pMnode) {}
|
||||
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||
int32_t code = 0;
|
||||
|
||||
// int32_t len = sdbGetRawTotalSize(pRaw);
|
||||
// SSdbRaw *pReceived = calloc(1, len);
|
||||
// memcpy(pReceived, pRaw, len);
|
||||
// mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
|
||||
|
||||
// mndTransApply(pMnode, pReceived, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
bool mndIsMaster(SMnode *pMnode) { return true; }
|
|
@ -410,25 +410,25 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
|
|||
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "privilege");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "account");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -300,26 +300,26 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "vgId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "tables");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
for (int32_t i = 0; i < pShow->replica; ++i) {
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
}
|
||||
|
||||
|
@ -402,13 +402,13 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
|
|||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "vgId");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "status");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
|
|
|
@ -54,15 +54,30 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
|
|||
return code;
|
||||
}
|
||||
|
||||
char *pMsg = rpcMallocCont(msgLen);
|
||||
if (NULL == pMsg) {
|
||||
ctgError("rpc malloc %d failed", msgLen);
|
||||
tfree(msg);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
}
|
||||
|
||||
memcpy(pMsg, msg, msgLen);
|
||||
|
||||
tfree(msg);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TSDB_MSG_TYPE_USE_DB,
|
||||
.pCont = msg,
|
||||
.pCont = pMsg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
ctgError("error rsp for use db, code:%x", rpcRsp.code);
|
||||
return rpcRsp.code;
|
||||
}
|
||||
|
||||
code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||
if (code) {
|
||||
|
@ -171,7 +186,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
|
|||
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code);
|
||||
ctgError("error rsp for table meta, code:%x", rpcRsp.code);
|
||||
return rpcRsp.code;
|
||||
}
|
||||
|
||||
|
@ -254,24 +269,6 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, co
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
SDBVgroupInfo dbInfo = {0};
|
||||
int32_t code = 0;
|
||||
int32_t vgId = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
|
||||
|
||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
||||
STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
|
||||
assert(pChild != NULL);
|
||||
|
@ -554,7 +551,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
|
||||
CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
||||
|
||||
STableMetaOutput output = {0};
|
||||
|
||||
|
@ -571,7 +568,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
|
|||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
|
||||
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
}
|
||||
|
@ -607,6 +604,24 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
|
||||
SDBVgroupInfo dbInfo = {0};
|
||||
int32_t code = 0;
|
||||
int32_t vgId = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
|
||||
|
||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||
return TSDB_CODE_TSC_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
|
||||
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||
|
|
|
@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
|||
ADD_EXECUTABLE(catalogTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(
|
||||
catalogTest
|
||||
PUBLIC os util common catalog transport gtest query
|
||||
PUBLIC os util common catalog transport gtest query taos
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
|
|
|
@ -27,15 +27,81 @@
|
|||
#include "tdef.h"
|
||||
#include "tvariant.h"
|
||||
#include "catalog.h"
|
||||
#include "tep.h"
|
||||
#include "trpc.h"
|
||||
|
||||
typedef struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
} SAppInstInfo;
|
||||
|
||||
typedef struct STscObj {
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_PASSWORD_LEN];
|
||||
char acctId[TSDB_ACCT_ID_LEN];
|
||||
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
|
||||
uint32_t connId;
|
||||
uint64_t id; // ref ID returned by taosAddRef
|
||||
// struct SSqlObj *sqlList;
|
||||
void *pTransporter;
|
||||
pthread_mutex_t mutex; // used to protect the operation on db
|
||||
int32_t numOfReqs; // number of sqlObj from this tscObj
|
||||
SAppInstInfo *pAppInfo;
|
||||
} STscObj;
|
||||
|
||||
namespace {
|
||||
|
||||
void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
|
||||
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
|
||||
strcpy(pReq->db, "1.db1");
|
||||
pReq->numOfVgroups = htonl(2);
|
||||
pReq->cacheBlockSize = htonl(16);
|
||||
pReq->totalBlocks = htonl(10);
|
||||
pReq->daysPerFile = htonl(10);
|
||||
pReq->daysToKeep0 = htonl(3650);
|
||||
pReq->daysToKeep1 = htonl(3650);
|
||||
pReq->daysToKeep2 = htonl(3650);
|
||||
pReq->minRowsPerFileBlock = htonl(100);
|
||||
pReq->maxRowsPerFileBlock = htonl(4096);
|
||||
pReq->commitTime = htonl(3600);
|
||||
pReq->fsyncPeriod = htonl(3000);
|
||||
pReq->walLevel = 1;
|
||||
pReq->precision = 0;
|
||||
pReq->compression = 2;
|
||||
pReq->replications = 1;
|
||||
pReq->quorum = 1;
|
||||
pReq->update = 0;
|
||||
pReq->cacheLastRow = 0;
|
||||
pReq->ignoreExist = 1;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = pReq;
|
||||
rpcMsg.contLen = sizeof(SCreateDbMsg);
|
||||
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(shandle, pEpSet, &rpcMsg, &rpcRsp);
|
||||
|
||||
ASSERT_EQ(rpcRsp.code, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST(testCase, normalCase) {
|
||||
STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
char *clusterId = "cluster1";
|
||||
char *dbname = "1.db1";
|
||||
char *tablename = "table1";
|
||||
struct SCatalog* pCtg = NULL;
|
||||
void *mockPointer = (void *)0x1;
|
||||
SVgroupInfo vgInfo = {0};
|
||||
|
||||
msgInit();
|
||||
|
||||
sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||
|
||||
int32_t code = catalogInit(NULL);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
@ -43,105 +109,12 @@ TEST(testCase, normalCase) {
|
|||
code = catalogGetHandle(clusterId, &pCtg);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
TEST(testCase, normalCase) {
|
||||
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
|
||||
ASSERT_EQ(info1.valid, true);
|
||||
|
||||
char msg[128] = {0};
|
||||
SMsgBuf buf;
|
||||
buf.len = 128;
|
||||
buf.buf = msg;
|
||||
|
||||
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
|
||||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SCatalogReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
||||
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||
setTableMetaInfo(pQueryInfo, &req);
|
||||
|
||||
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
|
||||
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
|
||||
ASSERT_EQ(ret, 0);
|
||||
|
||||
SArray* pExprList = pQueryInfo->exprList[0];
|
||||
|
||||
int32_t num = tsCompatibleModel? 2:1;
|
||||
ASSERT_EQ(taosArrayGetSize(pExprList), num);
|
||||
|
||||
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
|
||||
ASSERT_EQ(p1->base.pColumns->uid, 110);
|
||||
ASSERT_EQ(p1->base.numOfParams, 1);
|
||||
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
|
||||
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.interBytes, 16);
|
||||
|
||||
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
|
||||
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
|
||||
|
||||
tExprNode* pParam = p1->pExpr->_function.pChild[0];
|
||||
|
||||
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
|
||||
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
|
||||
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
|
||||
|
||||
struct SQueryPlanNode* n = nullptr;
|
||||
code = createQueryPlan(pQueryInfo, &n);
|
||||
|
||||
char* str = NULL;
|
||||
queryPlanToString(n, &str);
|
||||
printf("%s\n", str);
|
||||
|
||||
destroyQueryInfo(pQueryInfo);
|
||||
qParserClearupMetaRequestInfo(&req);
|
||||
destroySqlInfo(&info1);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, displayPlan) {
|
||||
generateLogicplan("select count(*) from `t.1abc`");
|
||||
generateLogicplan("select count(*)+ 22 from `t.1abc`");
|
||||
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
|
||||
generateLogicplan("select count(*) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(A+B) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
|
||||
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
|
||||
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
|
||||
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
|
||||
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
|
||||
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
|
||||
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
|
||||
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
|
||||
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
|
||||
|
||||
// order by + group by column + limit offset
|
||||
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
|
||||
|
||||
// fill
|
||||
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
|
||||
|
||||
// union + union all
|
||||
|
||||
|
||||
|
||||
// join
|
||||
|
||||
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
|
||||
// Projection(cols: [a+b #5059]) filters:(nil)
|
||||
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
|
||||
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
#define _TD_INDEX_INT_H_
|
||||
|
||||
#include "index.h"
|
||||
#include "index_fst.h"
|
||||
#include "tlog.h"
|
||||
#include "thash.h"
|
||||
#include "taos.h"
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
#include <lucene++/Lucene_c.h>
|
||||
|
@ -32,12 +35,20 @@ struct SIndex {
|
|||
#ifdef USE_LUCENE
|
||||
index_t *index;
|
||||
#endif
|
||||
void *cache;
|
||||
void *tindex;
|
||||
SHashObj *fieldObj; // <field name, field id>
|
||||
uint64_t suid;
|
||||
int fieldId;
|
||||
pthread_mutex_t mtx;
|
||||
};
|
||||
|
||||
struct SIndexOpts {
|
||||
#ifdef USE_LUCENE
|
||||
void *opts;
|
||||
#endif
|
||||
int32_t numOfItermLimit;
|
||||
int8_t mergeInterval;
|
||||
};
|
||||
|
||||
struct SIndexMultiTermQuery {
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef __INDEX_CACHE_H__
|
||||
#define __INDEX_CACHE_H__
|
||||
|
||||
#include "index.h"
|
||||
#include "tlockfree.h"
|
||||
// ----------------- row structure in skiplist ---------------------
|
||||
|
||||
/* A data row, the format is like below:
|
||||
* |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->|
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct IndexCache {
|
||||
T_REF_DECLARE()
|
||||
int cVersion; //
|
||||
} IndexCache;
|
||||
|
||||
|
||||
//
|
||||
IndexCache *indexCacheCreate();
|
||||
|
||||
void indexCacheDestroy(IndexCache *cache);
|
||||
|
||||
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType);
|
||||
|
||||
int indexCacheGet(IndexCache *cache, uint64_t *rst);
|
||||
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
#endif
|
|
@ -15,39 +15,58 @@
|
|||
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
#include "lucene++/Lucene_c.h"
|
||||
#endif
|
||||
|
||||
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
|
||||
|
||||
typedef struct SIdxFieldInfo {
|
||||
int id; // generated by index internal
|
||||
int type; // field type
|
||||
} SIdxFieldInfo;
|
||||
|
||||
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
|
||||
static void indexInit();
|
||||
|
||||
static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
|
||||
if (sIdx == NULL) {
|
||||
return -1;
|
||||
}
|
||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
return 0;
|
||||
}
|
||||
SIndex *indexOpen(SIndexOpts *opts, const char *path) {
|
||||
pthread_once(&isInit, indexInit);
|
||||
SIndex *sIdx = malloc(sizeof(SIndex));
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
index_t *index = index_open(path);
|
||||
SIndex *p = malloc(sizeof(SIndex));
|
||||
p->index = index;
|
||||
return p;
|
||||
sIdx->index = index;
|
||||
#endif
|
||||
return NULL;
|
||||
|
||||
sIdx->cache = (void*)indexCacheCreate();
|
||||
sIdx->tindex = NULL;
|
||||
sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
pthread_mutex_init(&sIdx->mtx, NULL);
|
||||
return sIdx;
|
||||
}
|
||||
|
||||
void indexClose(SIndex *index) {
|
||||
void indexClose(SIndex *sIdx) {
|
||||
#ifdef USE_LUCENE
|
||||
index_close(index->index);
|
||||
index->index = NULL;
|
||||
index_close(sIdex->index);
|
||||
sIdx->index = NULL;
|
||||
#endif
|
||||
free(index);
|
||||
indexCacheDestroy(sIdx->cache);
|
||||
taosHashCleanup(sIdx->fieldObj);
|
||||
pthread_mutex_destroy(&sIdx->mtx);
|
||||
free(sIdx);
|
||||
return;
|
||||
|
||||
}
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
#endif
|
||||
int indexPut(SIndex *index, SArray* field_vals, int uid) {
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
index_document_t *doc = index_document_create();
|
||||
|
||||
|
@ -63,6 +82,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
|
|||
index_put(index->index, doc);
|
||||
index_document_destroy(doc);
|
||||
#endif
|
||||
pthread_mutex_lock(&index->mtx);
|
||||
pthread_mutex_unlock(&index->mtx);
|
||||
return 1;
|
||||
|
||||
}
|
||||
|
@ -105,7 +126,9 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
|
|||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
|
||||
|
||||
return 1;
|
||||
}
|
||||
int indexRebuild(SIndex *index, SIndexOpts *opts);
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "index_cache.h"
|
||||
|
||||
static int32_t compareKey(const void *l, const void *r) {
|
||||
char *lp = (char *)l;
|
||||
char *rp = (char *)r;
|
||||
|
||||
// skip total len
|
||||
int32_t ll, rl; // len
|
||||
memcpy(&ll, lp, sizeof(int32_t));
|
||||
memcpy(&rl, rp, sizeof(int32_t));
|
||||
lp += sizeof(int32_t);
|
||||
rp += sizeof(int32_t);
|
||||
|
||||
// compare field id
|
||||
int32_t lf, rf; // field id
|
||||
memcpy(&lf, lp, sizeof(lf));
|
||||
memcpy(&rf, rp, sizeof(rf));
|
||||
if (lf != rf) {
|
||||
return lf < rf ? -1: 1;
|
||||
}
|
||||
lp += sizeof(lf);
|
||||
rp += sizeof(rf);
|
||||
|
||||
// compare field value
|
||||
int32_t lfl, rfl;
|
||||
memcpy(&lfl, lp, sizeof(lfl));
|
||||
memcpy(&rfl, rp, sizeof(rfl));
|
||||
lp += sizeof(lfl);
|
||||
rp += sizeof(rfl);
|
||||
|
||||
//refator later
|
||||
int32_t i, j;
|
||||
for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
|
||||
if (lp[i] == rp[j]) { continue; }
|
||||
else { return lp[i] < rp[j] ? -1 : 1;}
|
||||
}
|
||||
if (i < lfl) { return 1;}
|
||||
else if (j < rfl) { return -1; }
|
||||
lp += lfl;
|
||||
rp += rfl;
|
||||
|
||||
// compare version
|
||||
int32_t lv, rv;
|
||||
memcpy(&lv, lp, sizeof(lv));
|
||||
memcpy(&rv, rp, sizeof(rv));
|
||||
if (lv != rv) {
|
||||
return lv > rv ? -1 : 1;
|
||||
}
|
||||
lp += sizeof(lv);
|
||||
rp += sizeof(rv);
|
||||
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
IndexCache *indexCacheCreate() {
|
||||
IndexCache *cache = calloc(1, sizeof(IndexCache));
|
||||
return cache;
|
||||
}
|
||||
|
||||
void indexCacheDestroy(IndexCache *cache) {
|
||||
free(cache);
|
||||
}
|
||||
|
||||
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
|
||||
if (cache == NULL) { return -1;}
|
||||
int32_t version = T_REF_INC(cache);
|
||||
|
||||
int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType);
|
||||
|
||||
char *buf = calloc(1, total);
|
||||
char *p = buf;
|
||||
|
||||
memcpy(buf, &total, sizeof(total));
|
||||
total += total;
|
||||
|
||||
memcpy(buf, &fieldId, sizeof(fieldId));
|
||||
buf += sizeof(fieldId);
|
||||
|
||||
memcpy(buf, &fvlen, sizeof(fvlen));
|
||||
buf += sizeof(fvlen);
|
||||
memcpy(buf, fieldValue, fvlen);
|
||||
buf += fvlen;
|
||||
|
||||
memcpy(buf, &version, sizeof(version));
|
||||
buf += sizeof(version);
|
||||
|
||||
memcpy(buf, &uid, sizeof(uid));
|
||||
buf += sizeof(uid);
|
||||
|
||||
memcpy(buf, &operType, sizeof(operType));
|
||||
buf += sizeof(operType);
|
||||
|
||||
|
||||
}
|
||||
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1330,7 +1330,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
|
|||
SArray *nodes = taosArrayInit(8, sizeof(FstNode *));
|
||||
while (taosArrayGetSize(sws->stack) > 0) {
|
||||
StreamState *p = (StreamState *)taosArrayPop(sws->stack);
|
||||
if (p->trans >= FST_NODE_LEN(p->node) || automFuncs[aut->type].canMatch(aut, p->autState)) {
|
||||
if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) {
|
||||
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
|
||||
taosArrayPop(sws->inp);
|
||||
}
|
||||
|
|
|
@ -87,9 +87,18 @@ static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
|
|||
if (ssv == NULL || ctx == NULL) {return NULL;}
|
||||
|
||||
char *data = ctx->data;
|
||||
if (ssv->kind == Done) {
|
||||
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
|
||||
}
|
||||
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
|
||||
int val = ssv->val + 1;
|
||||
return startWithStateValueCreate(Running, FST_INT, &val);
|
||||
StartWithStateValue *nsv = startWithStateValueCreate(Running, FST_INT, &val);
|
||||
if (prefixIsMatch(ctx, nsv)) {
|
||||
nsv->kind = Done;
|
||||
} else {
|
||||
nsv->kind = Running;
|
||||
}
|
||||
return nsv;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -32,12 +32,6 @@ bool qIsInsertSql(const char* pStr, size_t length) {
|
|||
}
|
||||
|
||||
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen) {
|
||||
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
|
||||
if (pQueryInfo == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSqlInfo info = doGenerateAST(pStr);
|
||||
if (!info.valid) {
|
||||
strncpy(msg, info.msg, msgLen);
|
||||
|
@ -51,6 +45,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ
|
|||
// do nothing
|
||||
}
|
||||
} else {
|
||||
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
|
||||
if (pQueryInfo == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
|
||||
return terrno;
|
||||
}
|
||||
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
int32_t code = catalogGetHandle(NULL, &pCatalog);
|
||||
code = qParserValidateSqlNode(pCatalog, &info, pQueryInfo, id, msg, msgLen);
|
||||
|
@ -59,6 +59,7 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ
|
|||
}
|
||||
}
|
||||
|
||||
destroySqlInfo(&info);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,17 +21,6 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
|
||||
#include "tlog.h"
|
||||
|
||||
extern int32_t qDebugFlag;
|
||||
|
||||
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -120,7 +120,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd);
|
||||
|
||||
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
|
@ -305,7 +305,7 @@ void msgInit() {
|
|||
|
||||
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
|
||||
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
|
||||
|
@ -323,7 +323,7 @@ void msgInit() {
|
|||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
|
||||
|
|
|
@ -9,5 +9,5 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
scheduler
|
||||
PRIVATE os util planner common
|
||||
PRIVATE os util planner common query
|
||||
)
|
|
@ -24,12 +24,54 @@ extern "C" {
|
|||
#include "tarray.h"
|
||||
#include "planner.h"
|
||||
#include "scheduler.h"
|
||||
#include "thash.h"
|
||||
|
||||
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
|
||||
|
||||
enum {
|
||||
SCH_STATUS_NOT_START = 1,
|
||||
SCH_STATUS_EXECUTING,
|
||||
SCH_STATUS_SUCCEED,
|
||||
SCH_STATUS_FAILED,
|
||||
SCH_STATUS_CANCELLING,
|
||||
SCH_STATUS_CANCELLED
|
||||
};
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
SHashObj *Jobs; // key: queryId, value: SQueryJob*
|
||||
} SSchedulerMgmt;
|
||||
|
||||
typedef struct SQueryTask {
|
||||
uint64_t taskId; // task id
|
||||
char *pSubplan; // operator tree
|
||||
int8_t status; // task status
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
} SQueryTask;
|
||||
|
||||
typedef struct SQueryLevel {
|
||||
int8_t status;
|
||||
int32_t taskNum;
|
||||
|
||||
SArray *subTasks; // Element is SQueryTask
|
||||
SArray *subPlans; // Element is SSubplan
|
||||
} SQueryLevel;
|
||||
|
||||
typedef struct SQueryJob {
|
||||
uint64_t queryId;
|
||||
int32_t levelNum;
|
||||
int32_t levelIdx;
|
||||
int8_t status;
|
||||
SQueryProfileSummary summary;
|
||||
|
||||
SArray *levels; // Element is SQueryLevel, starting from 0.
|
||||
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
|
||||
} SQueryJob;
|
||||
|
||||
|
||||
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
|
||||
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
|
||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
|
||||
|
||||
typedef struct SQuery {
|
||||
SArray **pSubquery;
|
||||
int32_t numOfLevels;
|
||||
int32_t currentLevel;
|
||||
} SQuery;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -14,3 +14,131 @@
|
|||
*/
|
||||
|
||||
#include "schedulerInt.h"
|
||||
#include "taosmsg.h"
|
||||
#include "query.h"
|
||||
|
||||
SSchedulerMgmt schMgmt = {0};
|
||||
|
||||
|
||||
int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
|
||||
/*
|
||||
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
|
||||
if (pRequest == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SRequestMsgBody body = {0};
|
||||
buildConnectMsg(pRequest, &body);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
destroyConnectMsg(&body);
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
|
||||
printf("failed to connect to server, reason: %s\n\n", errorMsg);
|
||||
|
||||
destroyRequest(pRequest);
|
||||
taos_close(pTscObj);
|
||||
pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
|
||||
destroyRequest(pRequest);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
||||
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
|
||||
if (levelNum <= 0) {
|
||||
qError("invalid level num:%d", levelNum);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
|
||||
if (NULL == job->levels) {
|
||||
qError("taosArrayInit %d failed", levelNum);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
job->levelNum = levelNum;
|
||||
job->levelIdx = levelNum - 1;
|
||||
job->status = SCH_STATUS_NOT_START;
|
||||
|
||||
job->subPlans = dag->pSubplans;
|
||||
|
||||
SQueryLevel level = {0};
|
||||
SArray *levelPlans = NULL;
|
||||
int32_t levelPlanNum = 0;
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
levelPlans = taosArrayGetP(dag->pSubplans, i);
|
||||
if (NULL == levelPlans) {
|
||||
qError("no level plans for level %d", i);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
|
||||
if (levelPlanNum <= 0) {
|
||||
qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == schMgmt.Jobs) {
|
||||
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
|
||||
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
|
||||
SQueryJob *job = calloc(1, sizeof(SQueryJob));
|
||||
if (NULL == job) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
schValidateAndBuildJob(pDag, job);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
*(SQueryJob **)pJob = job;
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
int32_t scheduleFetchRows(void *pJob, void *data);
|
||||
|
||||
int32_t scheduleCancelJob(void *pJob);
|
||||
|
||||
void schedulerDestroy(void) {
|
||||
if (schMgmt.Jobs) {
|
||||
taosHashCleanup(schMgmt.Jobs); //TBD
|
||||
schMgmt.Jobs = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1120,11 +1120,11 @@ SysNameInfo taosGetSysNameInfo() {
|
|||
|
||||
struct utsname uts;
|
||||
if (!uname(&uts)) {
|
||||
info.sysname = strdup(uts.sysname);
|
||||
info.nodename = strdup(uts.nodename);
|
||||
info.release = strdup(uts.release);
|
||||
info.version = strdup(uts.version);
|
||||
info.machine = strdup(uts.machine);
|
||||
tstrncpy(info.sysname, uts.sysname, sizeof(info.sysname));
|
||||
tstrncpy(info.nodename, uts.nodename, sizeof(info.nodename));
|
||||
tstrncpy(info.release, uts.release, sizeof(info.release));
|
||||
tstrncpy(info.version, uts.version, sizeof(info.version));
|
||||
tstrncpy(info.machine, uts.machine, sizeof(info.machine));
|
||||
}
|
||||
|
||||
return info;
|
||||
|
|
|
@ -317,6 +317,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
|
||||
|
||||
|
||||
// grant
|
||||
|
|
|
@ -183,7 +183,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
|
||||
pSql->fp = fp;
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
||||
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||
|
@ -265,7 +265,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
|||
}
|
||||
|
||||
return;
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||
} else if (pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
|
||||
// in case of show command, return no data
|
||||
(*pSql->fetchFp)(param, pSql, 0);
|
||||
} else {
|
||||
|
@ -273,7 +273,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
|||
}
|
||||
} else { // current query is not completed, continue retrieve from node
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
||||
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
|
||||
|
|
|
@ -322,7 +322,7 @@ TAOS_ROW tscFetchRow(void *param) {
|
|||
|
||||
// current data set are exhausted, fetch more data from node
|
||||
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE_MNODE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
|
|
|
@ -539,7 +539,7 @@ int doBuildAndSendMsg(SSqlObj *pSql) {
|
|||
|
||||
if (pCmd->command == TSDB_SQL_SELECT ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_MNODE ||
|
||||
pCmd->command == TSDB_SQL_INSERT ||
|
||||
pCmd->command == TSDB_SQL_CONNECT ||
|
||||
pCmd->command == TSDB_SQL_HB ||
|
||||
|
@ -2749,7 +2749,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if ((pCmd->command == TSDB_SQL_RETRIEVE) ||
|
||||
if ((pCmd->command == TSDB_SQL_RETRIEVE_MNODE) ||
|
||||
((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) ||
|
||||
(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
|
@ -3174,7 +3174,7 @@ void tscInitMsgsFp() {
|
|||
|
||||
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
|
||||
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
|
||||
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
|
||||
|
@ -3192,7 +3192,7 @@ void tscInitMsgsFp() {
|
|||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
|
||||
|
||||
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
|
||||
|
@ -3214,7 +3214,7 @@ void tscInitMsgsFp() {
|
|||
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
|
||||
|
||||
tscKeepConn[TSDB_SQL_SHOW] = 1;
|
||||
tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
|
||||
tscKeepConn[TSDB_SQL_RETRIEVE_MNODE] = 1;
|
||||
tscKeepConn[TSDB_SQL_SELECT] = 1;
|
||||
tscKeepConn[TSDB_SQL_FETCH] = 1;
|
||||
tscKeepConn[TSDB_SQL_HB] = 1;
|
||||
|
|
|
@ -460,7 +460,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) {
|
|||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE_MNODE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
|
||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
|
@ -582,10 +582,10 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
|
|||
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && pSql->pStream == NULL && (pTableMetaInfo->pTableMeta != NULL) &&
|
||||
(cmd == TSDB_SQL_SELECT ||
|
||||
cmd == TSDB_SQL_SHOW ||
|
||||
cmd == TSDB_SQL_RETRIEVE ||
|
||||
cmd == TSDB_SQL_RETRIEVE_MNODE ||
|
||||
cmd == TSDB_SQL_FETCH)) {
|
||||
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
|
||||
tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]);
|
||||
|
||||
tscBuildAndSendRequest(pSql, NULL);
|
||||
|
|
|
@ -1670,7 +1670,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|||
pSql1->fp = joinRetrieveFinalResCallback;
|
||||
|
||||
if (pCmd1->command < TSDB_SQL_LOCAL) {
|
||||
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
||||
tscBuildAndSendRequest(pSql1, NULL);
|
||||
|
|
Loading…
Reference in New Issue