commit
4bd996e164
|
@ -108,7 +108,7 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup)
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
||||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
|
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo);
|
||||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -13,20 +13,20 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_COMMON_TMESSAGE_H_
|
#ifndef _TD_QUERY_H_
|
||||||
#define _TD_COMMON_TMESSAGE_H_
|
#define _TD_QUERY_H_
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
extern int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
|
||||||
extern int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
|
|
||||||
|
|
||||||
|
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
||||||
|
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_COMMON_TMESSAGE_H_*/
|
#endif /*_TD_QUERY_H_*/
|
|
@ -21,17 +21,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#include "tlog.h"
|
|
||||||
|
|
||||||
extern int32_t cDebugFlag;
|
|
||||||
|
|
||||||
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,4 +9,5 @@ add_subdirectory(cache)
|
||||||
add_subdirectory(catalog)
|
add_subdirectory(catalog)
|
||||||
add_subdirectory(executor)
|
add_subdirectory(executor)
|
||||||
add_subdirectory(planner)
|
add_subdirectory(planner)
|
||||||
add_subdirectory(function)
|
add_subdirectory(function)
|
||||||
|
add_subdirectory(query)
|
||||||
|
|
|
@ -8,5 +8,5 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
catalog
|
catalog
|
||||||
PRIVATE os util common transport
|
PRIVATE os util common transport query
|
||||||
)
|
)
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "catalogInt.h"
|
#include "catalogInt.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tmessage.h"
|
#include "query.h"
|
||||||
|
|
||||||
SCatalogMgmt ctgMgmt = {0};
|
SCatalogMgmt ctgMgmt = {0};
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
||||||
SEpSet *pVnodeEpSet = NULL;
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
int32_t code = tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen);
|
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
||||||
|
|
||||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
|
||||||
code = tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen);
|
code = queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,23 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
|
||||||
|
/*
|
||||||
|
if (NULL == pCatalog->dbCache.cache) {
|
||||||
|
*exist = 0;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen)
|
||||||
|
|
||||||
|
if (dbInfo) {
|
||||||
|
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
*exist = 1;
|
||||||
|
*/
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||||
|
@ -245,12 +261,41 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
|
||||||
|
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
||||||
|
return TSDB_CODE_CTG_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
int32_t exist = 0;
|
||||||
|
|
||||||
|
if (0 == forceUpdate) {
|
||||||
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
||||||
|
|
||||||
|
if (exist) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SDBVgroupInfo* newDbInfo = NULL;
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo));
|
||||||
|
|
||||||
|
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo));
|
||||||
|
|
||||||
|
if (dbInfo) {
|
||||||
|
*dbInfo = newDbInfo;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -265,7 +310,7 @@ int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, cons
|
||||||
SEpSet *pVnodeEpSet = NULL;
|
SEpSet *pVnodeEpSet = NULL;
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
|
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
aux_source_directory(src QUERY_SRC)
|
||||||
|
add_library(query ${QUERY_SRC})
|
||||||
|
target_include_directories(
|
||||||
|
query
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/query"
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(
|
||||||
|
query
|
||||||
|
PRIVATE os util common transport
|
||||||
|
)
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_QUERY_INT_H_
|
||||||
|
#define _TD_QUERY_INT_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
#include "tlog.h"
|
||||||
|
|
||||||
|
extern int32_t qDebugFlag;
|
||||||
|
|
||||||
|
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", qDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_QUERY_INT_H_*/
|
|
@ -14,15 +14,15 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "commonint.h"
|
#include "queryInt.h"
|
||||||
|
|
||||||
|
|
||||||
int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
||||||
|
|
||||||
int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
|
int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
|
||||||
|
|
||||||
|
|
||||||
int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
int32_t queryBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
if (NULL == msg || NULL == msgLen) {
|
if (NULL == msg || NULL == msgLen) {
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
if (NULL == input || NULL == msg || NULL == msgLen) {
|
if (NULL == input || NULL == msg || NULL == msgLen) {
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
@ -72,17 +72,17 @@ int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
|
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
|
||||||
|
|
||||||
if (pRsp->vgroupNum < 0) {
|
if (pRsp->vgroupNum < 0) {
|
||||||
tscError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum);
|
qError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->vgroupVersion < 0) {
|
if (pRsp->vgroupVersion < 0) {
|
||||||
tscError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion);
|
qError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) {
|
if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) {
|
||||||
tscError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum);
|
qError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,12 +104,11 @@ int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void msgInit() {
|
void msgInit() {
|
||||||
tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg;
|
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
|
||||||
tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = tscBuildVgroupListReqMsg;
|
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
|
||||||
|
|
||||||
|
|
||||||
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
|
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
|
||||||
tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = tscProcessVgroupListRsp;
|
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
|
Loading…
Reference in New Issue