Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv
This commit is contained in:
commit
8da5bbafd1
|
@ -2,6 +2,8 @@ build/
|
|||
compile_commands.json
|
||||
.cache
|
||||
.ycm_extra_conf.py
|
||||
.tasks
|
||||
.vimspector.json
|
||||
.vscode/
|
||||
.idea/
|
||||
cmake-build-debug/
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
|
@ -65,7 +66,7 @@ int32_t init_env() {
|
|||
taos_free_result(pRes);
|
||||
|
||||
|
||||
char* sql = "select * from st1";
|
||||
const char* sql = "select * from st1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
/*if (taos_errno(pRes) != 0) {*/
|
||||
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
|
||||
|
@ -112,14 +113,20 @@ void basic_consume_loop(tmq_t *tmq,
|
|||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t cnt = 0;
|
||||
clock_t startTime = clock();
|
||||
while (running) {
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmq) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
/*msg_process(tmqmessage);*/
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clock_t endTime = clock();
|
||||
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
|
@ -163,6 +170,6 @@ int main() {
|
|||
code = init_env();
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
sync_consume_loop(tmq, topic_list);
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
/*sync_consume_loop(tmq, topic_list);*/
|
||||
}
|
||||
|
|
|
@ -213,7 +213,7 @@ typedef struct tmq_message_t tmq_message_t;
|
|||
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
|
||||
|
||||
DLL_EXPORT tmq_list_t *tmq_list_new();
|
||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *);
|
||||
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
|
||||
|
||||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
|
|
|
@ -154,8 +154,8 @@ typedef enum _mgmt_table {
|
|||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
char* dbName;
|
||||
char* tableFullName;
|
||||
char* dbFName;
|
||||
char* tbName;
|
||||
} SBuildTableMetaInput;
|
||||
|
||||
typedef struct {
|
||||
|
@ -696,8 +696,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
char dbFname[TSDB_DB_FNAME_LEN];
|
||||
char tableFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} STableInfoReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -722,9 +722,9 @@ typedef struct {
|
|||
} SVgroupsInfo;
|
||||
|
||||
typedef struct {
|
||||
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
|
||||
char stbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFname[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
int8_t precision;
|
||||
|
|
|
@ -49,17 +49,19 @@ typedef struct SCatalogCfg {
|
|||
uint32_t maxTblCacheNum;
|
||||
uint32_t maxDBCacheNum;
|
||||
uint32_t dbRentSec;
|
||||
uint32_t stableRentSec;
|
||||
uint32_t stbRentSec;
|
||||
} SCatalogCfg;
|
||||
|
||||
typedef struct SSTableMetaVersion {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
uint64_t suid;
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
} SSTableMetaVersion;
|
||||
|
||||
typedef struct SDbVgVersion {
|
||||
char dbName[TSDB_DB_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
int64_t dbId;
|
||||
int32_t vgVersion;
|
||||
} SDbVgVersion;
|
||||
|
@ -99,7 +101,9 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
|
|||
|
||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
|
||||
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
||||
|
||||
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid);
|
||||
|
||||
/**
|
||||
* Get a table's meta data.
|
||||
|
|
|
@ -81,16 +81,15 @@ typedef struct STableMeta {
|
|||
} STableMeta;
|
||||
|
||||
typedef struct SDBVgroupInfo {
|
||||
SRWLatch lock;
|
||||
uint64_t dbId;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod;
|
||||
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
||||
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
|
||||
} SDBVgroupInfo;
|
||||
|
||||
typedef struct SUseDbOutput {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SDBVgroupInfo dbVgroup;
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
SDBVgroupInfo *dbVgroup;
|
||||
} SUseDbOutput;
|
||||
|
||||
enum {
|
||||
|
@ -103,8 +102,9 @@ enum {
|
|||
|
||||
typedef struct STableMetaOutput {
|
||||
int32_t metaType;
|
||||
char ctbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char tbFname[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
SCTableMeta ctbMeta;
|
||||
STableMeta *tbMeta;
|
||||
} STableMetaOutput;
|
||||
|
|
|
@ -78,6 +78,11 @@ typedef struct SNodeList {
|
|||
SListCell* pTail;
|
||||
} SNodeList;
|
||||
|
||||
typedef struct SNameStr {
|
||||
int32_t len;
|
||||
char* pName;
|
||||
} SNameStr;
|
||||
|
||||
typedef struct SDataType {
|
||||
uint8_t type;
|
||||
uint8_t precision;
|
||||
|
@ -89,6 +94,7 @@ typedef struct SExprNode {
|
|||
ENodeType nodeType;
|
||||
SDataType resType;
|
||||
char aliasName[TSDB_COL_NAME_LEN];
|
||||
SNodeList* pAssociationList;
|
||||
} SExprNode;
|
||||
|
||||
typedef enum EColumnType {
|
||||
|
@ -102,7 +108,9 @@ typedef struct SColumnNode {
|
|||
EColumnType colType; // column or tag
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
char tableAlias[TSDB_TABLE_NAME_LEN];
|
||||
char colName[TSDB_COL_NAME_LEN];
|
||||
SNode* pProjectRef;
|
||||
} SColumnNode;
|
||||
|
||||
typedef struct SValueNode {
|
||||
|
@ -176,13 +184,16 @@ typedef struct SFunctionNode {
|
|||
|
||||
typedef struct STableNode {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
char tableAliasName[TSDB_COL_NAME_LEN];
|
||||
char tableAlias[TSDB_TABLE_NAME_LEN];
|
||||
} STableNode;
|
||||
|
||||
struct STableMeta;
|
||||
|
||||
typedef struct SRealTableNode {
|
||||
STableNode table; // QUERY_NODE_REAL_TABLE
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
struct STableMeta* pMeta;
|
||||
} SRealTableNode;
|
||||
|
||||
typedef struct STempTableNode {
|
||||
|
@ -273,7 +284,6 @@ typedef struct SFillNode {
|
|||
typedef struct SSelectStmt {
|
||||
ENodeType type; // QUERY_NODE_SELECT_STMT
|
||||
bool isDistinct;
|
||||
bool isStar;
|
||||
SNodeList* pProjectionList; // SNode
|
||||
SNode* pFromTable;
|
||||
SNode* pWhere;
|
||||
|
@ -295,6 +305,8 @@ typedef struct SSetOperator {
|
|||
ESetOperatorType opType;
|
||||
SNode* pLeft;
|
||||
SNode* pRight;
|
||||
SNodeList* pOrderByList; // SOrderByExprNode
|
||||
SNode* pLimit;
|
||||
} SSetOperator;
|
||||
|
||||
SNode* nodesMakeNode(ENodeType type);
|
||||
|
@ -306,8 +318,10 @@ void nodesDestroyList(SNodeList* pList);
|
|||
|
||||
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
|
||||
|
||||
bool nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||
bool nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
|
||||
void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||
void nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
|
||||
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
|
||||
|
||||
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext);
|
||||
|
||||
|
|
|
@ -30,4 +30,4 @@ static const int32_t endian_test_var = 1;
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_OS_ENDIAN_H_*/
|
||||
#endif /*_TD_OS_ENDIAN_H_*/
|
||||
|
|
|
@ -36,25 +36,25 @@ extern "C" {
|
|||
|
||||
#else
|
||||
|
||||
#define TSWAP(a, b, c) \
|
||||
do { \
|
||||
typeof(a) __tmp = (a); \
|
||||
(a) = (b); \
|
||||
(b) = __tmp; \
|
||||
#define TSWAP(a, b, c) \
|
||||
do { \
|
||||
__typeof(a) __tmp = (a); \
|
||||
(a) = (b); \
|
||||
(b) = __tmp; \
|
||||
} while (0)
|
||||
|
||||
#define TMAX(a, b) \
|
||||
({ \
|
||||
typeof(a) __a = (a); \
|
||||
typeof(b) __b = (b); \
|
||||
(__a > __b) ? __a : __b; \
|
||||
#define TMAX(a, b) \
|
||||
({ \
|
||||
__typeof(a) __a = (a); \
|
||||
__typeof(b) __b = (b); \
|
||||
(__a > __b) ? __a : __b; \
|
||||
})
|
||||
|
||||
#define TMIN(a, b) \
|
||||
({ \
|
||||
typeof(a) __a = (a); \
|
||||
typeof(b) __b = (b); \
|
||||
(__a < __b) ? __a : __b; \
|
||||
#define TMIN(a, b) \
|
||||
({ \
|
||||
__typeof(a) __a = (a); \
|
||||
__typeof(b) __b = (b); \
|
||||
(__a < __b) ? __a : __b; \
|
||||
})
|
||||
#endif
|
||||
|
||||
|
|
|
@ -440,7 +440,10 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
|
||||
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
|
||||
|
||||
|
||||
//parser
|
||||
#define TSDB_CODE_PARSER_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
|
||||
#define TSDB_CODE_PARSER_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
|
||||
#define TSDB_CODE_PARSER_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -41,19 +41,14 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
|
||||
|
||||
if (rsp->vgVersion < 0) {
|
||||
SDbVgVersion dbInfo;
|
||||
strcpy(dbInfo.dbName, rsp->db);
|
||||
dbInfo.dbId = rsp->uid;
|
||||
dbInfo.vgVersion = rsp->vgVersion;
|
||||
|
||||
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
|
||||
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
||||
} else {
|
||||
SDBVgroupInfo vgInfo = {0};
|
||||
vgInfo.dbId = rsp->uid;
|
||||
vgInfo.vgVersion = rsp->vgVersion;
|
||||
vgInfo.hashMethod = rsp->hashMethod;
|
||||
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgInfo) {
|
||||
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgHash) {
|
||||
tscError("hash init[%d] failed", rsp->vgNum);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -67,16 +62,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
tscError("hash push failed, errno:%d", errno);
|
||||
taosHashCleanup(vgInfo.vgInfo);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
|
||||
if (code) {
|
||||
taosHashCleanup(vgInfo.vgInfo);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t msgLen = 0;
|
||||
int32_t code = 0;
|
||||
int32_t schemaNum = 0;
|
||||
|
||||
while (msgLen < valueLen) {
|
||||
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
|
||||
|
||||
rsp->numOfColumns = ntohl(rsp->numOfColumns);
|
||||
rsp->suid = be64toh(rsp->suid);
|
||||
|
||||
if (rsp->numOfColumns < 0) {
|
||||
schemaNum = 0;
|
||||
|
||||
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
|
||||
code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
|
||||
} else {
|
||||
rsp->numOfTags = ntohl(rsp->numOfTags);
|
||||
|
||||
schemaNum = rsp->numOfColumns + rsp->numOfTags;
|
||||
/*
|
||||
rsp->vgNum = ntohl(rsp->vgNum);
|
||||
rsp->uid = be64toh(rsp->uid);
|
||||
|
||||
SDBVgroupInfo vgInfo = {0};
|
||||
vgInfo.dbId = rsp->uid;
|
||||
vgInfo.vgVersion = rsp->vgVersion;
|
||||
vgInfo.hashMethod = rsp->hashMethod;
|
||||
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgHash) {
|
||||
tscError("hash init[%d] failed", rsp->vgNum);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < rsp->vgNum; ++i) {
|
||||
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
|
||||
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
|
||||
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
|
||||
|
||||
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
|
||||
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
tscError("hash push failed, errno:%d", errno);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
|
||||
if (code) {
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
|
||||
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
|
@ -122,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
|
|||
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_STBINFO:
|
||||
case HEARTBEAT_KEY_STBINFO:{
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
tscError("invalid hb key type:%d", kv->key);
|
||||
break;
|
||||
|
@ -157,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
|
|||
tfree(param);
|
||||
|
||||
if (rspNum) {
|
||||
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
|
||||
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
|
||||
} else {
|
||||
atomic_add_fetch_32(&emptyRspNum, 1);
|
||||
}
|
||||
|
@ -204,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||
SSTableMetaVersion *stbs = NULL;
|
||||
uint32_t stbNum = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (stbNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stbNum; ++i) {
|
||||
SSTableMetaVersion *stb = &stbs[i];
|
||||
stb->suid = htobe64(stb->suid);
|
||||
stb->sversion = htons(stb->sversion);
|
||||
stb->tversion = htons(stb->tversion);
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
|
||||
|
||||
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
|
||||
|
||||
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
|
||||
int64_t *clusterId = (int64_t *)param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
@ -219,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
|
|||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -389,7 +504,6 @@ static void hbStopThread() {
|
|||
}
|
||||
|
||||
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
|
||||
return NULL;
|
||||
hbMgrInit();
|
||||
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
|
||||
if (pAppHbMgr == NULL) {
|
||||
|
@ -447,7 +561,6 @@ void appHbMgrCleanup(void) {
|
|||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
return 0;
|
||||
// init once
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
@ -504,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
|
|||
}
|
||||
|
||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
|
||||
return 0;
|
||||
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
|
||||
SHbConnInfo info = {0};
|
||||
|
||||
|
|
|
@ -109,7 +109,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
|
||||
p->mgmtEp = epSet;
|
||||
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
|
||||
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
|
||||
p->pAppHbMgr = appHbMgrInit(p, key);
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
|
||||
pInst = &p;
|
||||
|
|
|
@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
|
||||
|
||||
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/
|
||||
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
|
||||
|
||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
|
||||
|
@ -302,15 +302,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
|
||||
|
||||
SDbVgVersion dbVer = {0};
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
|
||||
dbVer.dbId = be64toh(rsp->uid);
|
||||
rsp->uid = be64toh(rsp->uid);
|
||||
|
||||
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
|
||||
catalogRemoveDBVgroup(pCatalog, &dbVer);
|
||||
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
||||
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return code;
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "parser.h"
|
||||
|
@ -146,7 +148,7 @@ tmq_list_t* tmq_list_new() {
|
|||
return ptr;
|
||||
}
|
||||
|
||||
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
|
||||
int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
|
||||
if (ptr->cnt >= ptr->tot - 1) return -1;
|
||||
ptr->elems[ptr->cnt] = strdup(src);
|
||||
ptr->cnt++;
|
||||
|
@ -366,7 +368,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
.logicalPlan = "no logic plan",
|
||||
.logicalPlan = (char*)"no logic plan",
|
||||
};
|
||||
|
||||
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
|
||||
|
@ -512,7 +514,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return -1;
|
||||
}
|
||||
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
|
||||
/*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/
|
||||
/*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||
if (pRsp->numOfTopics == 0) {
|
||||
/*printf("no data\n");*/
|
||||
free(pRsp);
|
||||
|
|
|
@ -630,14 +630,6 @@ void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
#define TSWAP(a, b, c) \
|
||||
do { \
|
||||
typeof(a) __tmp = (a); \
|
||||
(a) = (b); \
|
||||
(b) = __tmp; \
|
||||
} while (0)
|
||||
|
||||
|
||||
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
|
|
|
@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) {
|
|||
|
||||
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
|
||||
|
||||
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; }
|
||||
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
|
||||
|
||||
void Testbase::SendShowRetrieveReq() {
|
||||
int32_t contLen = sizeof(SRetrieveTableReq);
|
||||
|
@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() {
|
|||
pos = 0;
|
||||
}
|
||||
|
||||
const char* Testbase::GetShowName() { return pMeta->tbFname; }
|
||||
const char* Testbase::GetShowName() { return pMeta->tbName; }
|
||||
|
||||
int8_t Testbase::GetShowInt8() {
|
||||
int8_t data = *((int8_t*)(pData + pos));
|
||||
|
|
|
@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode);
|
|||
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
||||
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
||||
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
|
|||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
|
@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
|
|||
|
||||
pShow->numOfRows = 1;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
|
@ -54,13 +55,14 @@ void mndCleanupConsumer(SMnode *pMnode) {}
|
|||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void *buf = malloc(tlen);
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void *abuf = buf;
|
||||
|
@ -88,6 +90,7 @@ CM_ENCODE_OVER:
|
|||
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
|
||||
|
@ -106,7 +109,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t len;
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
||||
void *buf = malloc(len);
|
||||
buf = malloc(len);
|
||||
if (buf == NULL) goto CM_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||
|
|
|
@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
|
|||
|
||||
len = 0;
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName);
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName);
|
||||
if (pDb == NULL) {
|
||||
mInfo("db %s not exist", db->dbName);
|
||||
mInfo("db %s not exist", db->dbFName);
|
||||
|
||||
len = sizeof(SUseDbRsp);
|
||||
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
|
||||
|
@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
|
|||
}
|
||||
|
||||
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
|
||||
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN);
|
||||
memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN);
|
||||
if (pDb) {
|
||||
int32_t vgNum = 0;
|
||||
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
|
||||
|
@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
|
|||
|
||||
pShow->numOfRows = TSDB_CONFIG_NUMBER;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
mndUpdateMnodeRole(pMnode);
|
||||
return 0;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "mndProfile.h"
|
||||
//#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
//#include "mndTopic.h"
|
||||
|
@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_STBINFO:
|
||||
|
||||
case HEARTBEAT_KEY_STBINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
mError("invalid kv key:%d", kv->key);
|
||||
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
|
||||
|
@ -623,7 +631,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -792,7 +800,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = 1000000;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -723,20 +723,23 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
SMnode *pMnode = pReq->pMnode;
|
||||
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pInfo->dbFName, pInfo->tbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname);
|
||||
mDebug("stb:%s, start to retrieve meta", tbFName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pInfo->dbFName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname);
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (pStb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_MND_INVALID_STB;
|
||||
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -750,11 +753,13 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMeta->tbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
strcpy(pMeta->dbFName, pStb->db);
|
||||
strcpy(pMeta->tbName, pInfo->tbName);
|
||||
strcpy(pMeta->stbName, pInfo->tbName);
|
||||
pMeta->numOfTags = htonl(pStb->numOfTags);
|
||||
pMeta->numOfColumns = htonl(pStb->numOfColumns);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
|
@ -779,10 +784,113 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
pReq->pCont = pMeta;
|
||||
pReq->contLen = contLen;
|
||||
|
||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
|
||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", tbFName, pStb->numOfColumns, pStb->numOfTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
void *buf = malloc(bufSize);
|
||||
int32_t len = 0;
|
||||
int32_t contLen = 0;
|
||||
STableMetaRsp *pRsp = NULL;
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSTableMetaVersion *stb = &stbs[i];
|
||||
stb->suid = be64toh(stb->suid);
|
||||
stb->sversion = ntohs(stb->sversion);
|
||||
stb->tversion = ntohs(stb->tversion);
|
||||
|
||||
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
|
||||
bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
buf = realloc(buf, bufSize);
|
||||
}
|
||||
|
||||
pRsp = (STableMetaRsp *)((char *)buf + contLen);
|
||||
|
||||
strcpy(pRsp->dbFName, stb->dbFName);
|
||||
strcpy(pRsp->tbName, stb->stbName);
|
||||
strcpy(pRsp->stbName, stb->stbName);
|
||||
|
||||
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
|
||||
if (pDb == NULL) {
|
||||
pRsp->numOfColumns = -1;
|
||||
pRsp->suid = htobe64(stb->suid);
|
||||
contLen += sizeof(STableMetaRsp);
|
||||
mWarn("db:%s, failed to require db since %s", stb->dbFName, terrstr());
|
||||
continue;
|
||||
}
|
||||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (pStb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pRsp->numOfColumns = -1;
|
||||
pRsp->suid = htobe64(stb->suid);
|
||||
contLen += sizeof(STableMetaRsp);
|
||||
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
continue;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pStb->lock);
|
||||
|
||||
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
|
||||
int32_t len = totalCols * sizeof(SSchema);
|
||||
|
||||
contLen += sizeof(STableMetaRsp) + len;
|
||||
|
||||
if (contLen > bufSize) {
|
||||
bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
buf = realloc(buf, bufSize);
|
||||
}
|
||||
|
||||
pRsp->numOfTags = htonl(pStb->numOfTags);
|
||||
pRsp->numOfColumns = htonl(pStb->numOfColumns);
|
||||
pRsp->precision = pDb->cfg.precision;
|
||||
pRsp->tableType = TSDB_SUPER_TABLE;
|
||||
pRsp->update = pDb->cfg.update;
|
||||
pRsp->sversion = htonl(pStb->version);
|
||||
pRsp->suid = htobe64(pStb->uid);
|
||||
pRsp->tuid = htobe64(pStb->uid);
|
||||
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pRsp->pSchema[i];
|
||||
SSchema *pSrcSchema = &pStb->pSchema[i];
|
||||
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->type = pSrcSchema->type;
|
||||
pSchema->colId = htonl(pSrcSchema->colId);
|
||||
pSchema->bytes = htonl(pSrcSchema->bytes);
|
||||
}
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
}
|
||||
|
||||
if (contLen > 0) {
|
||||
*rsp = buf;
|
||||
*rspLen = contLen;
|
||||
} else {
|
||||
*rsp = NULL;
|
||||
tfree(buf);
|
||||
*rspLen = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
|
@ -855,7 +963,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndConsumer.h"
|
||||
|
@ -372,13 +373,14 @@ void mndCleanupSubscribe(SMnode *pMnode) {}
|
|||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
||||
|
||||
void *buf = malloc(tlen);
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto SUB_ENCODE_OVER;
|
||||
|
||||
void *abuf = buf;
|
||||
|
@ -406,6 +408,7 @@ SUB_ENCODE_OVER:
|
|||
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
||||
|
@ -425,7 +428,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t tlen;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
||||
void *buf = malloc(tlen + 1);
|
||||
buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto SUB_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||
|
@ -742,7 +745,7 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);
|
||||
mDebug("subscribe:%s, start to retrieve meta", pInfo->tbName);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
|
||||
|
@ -873,7 +876,7 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRs
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -338,7 +338,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
|
||||
mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
|
||||
|
@ -469,7 +469,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
|
|||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
|
|||
}
|
||||
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
pShow->replica = dnodeId;
|
||||
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
|
|||
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
|
||||
|
||||
STableMetaRsp* pMeta = test.GetShowMeta();
|
||||
EXPECT_STREQ(pMeta->tbFname, "show connections");
|
||||
EXPECT_STREQ(pMeta->tbName, "show connections");
|
||||
EXPECT_EQ(pMeta->numOfTags, 0);
|
||||
EXPECT_EQ(pMeta->numOfColumns, 7);
|
||||
EXPECT_EQ(pMeta->precision, 0);
|
||||
|
|
|
@ -126,7 +126,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
int32_t contLen = sizeof(STableInfoReq);
|
||||
|
||||
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->tableFname, "1.d1.stb");
|
||||
strcpy(pReq->dbFName, "1.d1");
|
||||
strcpy(pReq->tbName, "stb");
|
||||
|
||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
|
@ -146,8 +147,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
}
|
||||
|
||||
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb");
|
||||
EXPECT_STREQ(pRsp->stbFname, "");
|
||||
EXPECT_STREQ(pRsp->dbFName, "1.d1");
|
||||
EXPECT_STREQ(pRsp->tbName, "stb");
|
||||
EXPECT_STREQ(pRsp->stbName, "stb");
|
||||
EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||
EXPECT_EQ(pRsp->numOfTags, 3);
|
||||
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
#ifndef _TD_TQ_INT_H_
|
||||
#define _TD_TQ_INT_H_
|
||||
|
||||
#include "tq.h"
|
||||
#include "meta.h"
|
||||
#include "tlog.h"
|
||||
#include "tq.h"
|
||||
#include "trpc.h"
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -26,29 +26,48 @@ extern "C" {
|
|||
|
||||
extern int32_t tqDebugFlag;
|
||||
|
||||
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
|
||||
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
|
||||
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
|
||||
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
|
||||
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
|
||||
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
|
||||
#define tqFatal(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqError(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqWarn(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqInfo(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("TQ ", 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqDebug(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tqTrace(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
// create persistent storage for meta info such as consuming offset
|
||||
// return value > 0: cgId
|
||||
// return value <= 0: error code
|
||||
// int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
|
||||
// create ring buffer in memory and load consuming offset
|
||||
// int tqOpenTCGroup(STQ*, const char* topic, int cgId);
|
||||
// destroy ring buffer and persist consuming offset
|
||||
// int tqCloseTCGroup(STQ*, const char* topic, int cgId);
|
||||
// delete persistent storage for meta info
|
||||
// int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||
|
||||
//int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
|
||||
//const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
|
||||
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -12,19 +12,12 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tcompare.h"
|
||||
#include "tqInt.h"
|
||||
#include "tqMetaStore.h"
|
||||
|
||||
// static
|
||||
// read next version data
|
||||
//
|
||||
// send to fetch queue
|
||||
//
|
||||
// handle management message
|
||||
//
|
||||
|
||||
int tqInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
|
|
@ -84,7 +84,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int msgLen = 0;
|
||||
int32_t code = TSDB_CODE_VND_APP_ERROR;
|
||||
|
||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
|
||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
|
||||
if (pTbCfg == NULL) {
|
||||
code = TSDB_CODE_VND_TB_NOT_EXIST;
|
||||
goto _exit;
|
||||
|
@ -119,13 +119,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname));
|
||||
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
|
||||
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
|
||||
strcpy(pTbMetaMsg->tbName, pReq->tbName);
|
||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
|
||||
strcpy(pTbMetaMsg->stbName, pStbCfg->name);
|
||||
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
|
||||
} else if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
|
||||
strcpy(pTbMetaMsg->stbName, pTbCfg->name);
|
||||
pTbMetaMsg->suid = htobe64(uid);
|
||||
}
|
||||
pTbMetaMsg->numOfTags = htonl(nTagCols);
|
||||
|
|
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
|
||||
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
|
||||
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
|
||||
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
|
||||
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
|
||||
#define CTG_DEFAULT_RENT_SECOND 10
|
||||
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
|
||||
|
||||
|
@ -47,55 +47,52 @@ enum {
|
|||
CTG_RENT_STABLE,
|
||||
};
|
||||
|
||||
typedef struct SCTGDebug {
|
||||
typedef struct SCtgDebug {
|
||||
int32_t lockDebug;
|
||||
} SCTGDebug;
|
||||
} SCtgDebug;
|
||||
|
||||
|
||||
typedef struct SVgroupListCache {
|
||||
int32_t vgroupVersion;
|
||||
SHashObj *cache; // key:vgId, value:SVgroupInfo
|
||||
} SVgroupListCache;
|
||||
typedef struct SCtgTbMetaCache {
|
||||
SRWLatch stbLock;
|
||||
SHashObj *cache; //key:tbname, value:STableMeta
|
||||
SHashObj *stbCache; //key:suid, value:STableMeta*
|
||||
} SCtgTbMetaCache;
|
||||
|
||||
typedef struct SDBVgroupCache {
|
||||
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
|
||||
} SDBVgroupCache;
|
||||
typedef struct SCtgDBCache {
|
||||
SRWLatch vgLock;
|
||||
int8_t deleted;
|
||||
SDBVgroupInfo *vgInfo;
|
||||
SCtgTbMetaCache tbCache;
|
||||
} SCtgDBCache;
|
||||
|
||||
typedef struct STableMetaCache {
|
||||
SRWLatch stableLock;
|
||||
SHashObj *cache; //key:fulltablename, value:STableMeta
|
||||
SHashObj *stableCache; //key:suid, value:STableMeta*
|
||||
} STableMetaCache;
|
||||
|
||||
typedef struct SRentSlotInfo {
|
||||
typedef struct SCtgRentSlot {
|
||||
SRWLatch lock;
|
||||
bool needSort;
|
||||
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
|
||||
} SRentSlotInfo;
|
||||
} SCtgRentSlot;
|
||||
|
||||
typedef struct SMetaRentMgmt {
|
||||
typedef struct SCtgRentMgmt {
|
||||
int8_t type;
|
||||
uint16_t slotNum;
|
||||
uint16_t slotRIdx;
|
||||
int64_t lastReadMsec;
|
||||
SRentSlotInfo *slots;
|
||||
} SMetaRentMgmt;
|
||||
SCtgRentSlot *slots;
|
||||
} SCtgRentMgmt;
|
||||
|
||||
typedef struct SCatalog {
|
||||
uint64_t clusterId;
|
||||
SDBVgroupCache dbCache;
|
||||
STableMetaCache tableCache;
|
||||
SMetaRentMgmt dbRent;
|
||||
SMetaRentMgmt stableRent;
|
||||
uint64_t clusterId;
|
||||
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
|
||||
SCtgRentMgmt dbRent;
|
||||
SCtgRentMgmt stbRent;
|
||||
} SCatalog;
|
||||
|
||||
typedef struct SCtgApiStat {
|
||||
|
||||
} SCtgApiStat;
|
||||
|
||||
typedef struct SCtgResourceStat {
|
||||
typedef struct SCtgRuntimeStat {
|
||||
|
||||
} SCtgResourceStat;
|
||||
} SCtgRuntimeStat;
|
||||
|
||||
typedef struct SCtgCacheStat {
|
||||
|
||||
|
@ -103,7 +100,7 @@ typedef struct SCtgCacheStat {
|
|||
|
||||
typedef struct SCatalogStat {
|
||||
SCtgApiStat api;
|
||||
SCtgResourceStat resource;
|
||||
SCtgRuntimeStat runtime;
|
||||
SCtgCacheStat cache;
|
||||
} SCatalogStat;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -128,15 +128,14 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
|
|||
strcpy(sn.dbname, "db1");
|
||||
strcpy(sn.tname, ctgTestSTablename);
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&cn, tbFullName);
|
||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&cn, db);
|
||||
|
||||
strcpy(output->dbFName, db);
|
||||
SET_META_TYPE_BOTH_TABLE(output->metaType);
|
||||
|
||||
strcpy(output->ctbFname, tbFullName);
|
||||
|
||||
tNameExtractFullName(&cn, tbFullName);
|
||||
strcpy(output->tbFname, tbFullName);
|
||||
strcpy(output->ctbName, cn.tname);
|
||||
strcpy(output->tbName, sn.tname);
|
||||
|
||||
output->ctbMeta.vgId = 9;
|
||||
output->ctbMeta.tableType = TSDB_CHILD_TABLE;
|
||||
|
@ -175,10 +174,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
|
|||
strcpy(s->name, "tag1s");
|
||||
}
|
||||
|
||||
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
||||
void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
|
||||
static int32_t vgVersion = ctgTestVgVersion + 1;
|
||||
int32_t vgNum = 0;
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
|
||||
|
||||
dbVgroup->vgVersion = vgVersion++;
|
||||
|
||||
|
@ -186,7 +186,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
|||
|
||||
dbVgroup->hashMethod = 0;
|
||||
dbVgroup->dbId = ctgTestDbId;
|
||||
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
|
||||
uint32_t hashUnit = UINT32_MAX / vgNum;
|
||||
|
@ -203,8 +203,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
|
|||
addr->port = htons(n + 22);
|
||||
}
|
||||
|
||||
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
|
||||
taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
|
||||
}
|
||||
|
||||
*pdbVgroup = dbVgroup;
|
||||
}
|
||||
|
||||
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
|
@ -250,7 +252,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestTablename);
|
||||
rspMsg->numOfTags = 0;
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -285,8 +288,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
|
||||
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestCTablename);
|
||||
strcpy(rspMsg->stbName, ctgTestSTablename);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -327,8 +331,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
strcpy(rspMsg->tbName, ctgTestSTablename);
|
||||
strcpy(rspMsg->stbName, ctgTestSTablename);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -370,8 +375,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
|
|||
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
|
||||
pRsp->pCont = calloc(1, pRsp->contLen);
|
||||
rspMsg = (STableMetaRsp *)pRsp->pCont;
|
||||
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
|
||||
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
|
||||
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||
sprintf(rspMsg->tbName, "%s_%d", ctgTestSTablename, idx);
|
||||
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
|
||||
rspMsg->numOfTags = htonl(ctgTestTagNum);
|
||||
rspMsg->numOfColumns = htonl(ctgTestColNum);
|
||||
rspMsg->precision = 1;
|
||||
|
@ -589,12 +595,12 @@ void *ctgTestGetDbVgroupThread(void *param) {
|
|||
void *ctgTestSetDbVgroupThread(void *param) {
|
||||
struct SCatalog *pCtg = (struct SCatalog *)param;
|
||||
int32_t code = 0;
|
||||
SDBVgroupInfo dbVgroup = {0};
|
||||
SDBVgroupInfo *dbVgroup = NULL;
|
||||
int32_t n = 0;
|
||||
|
||||
while (!ctgTestStop) {
|
||||
ctgTestBuildDBVgroup(&dbVgroup);
|
||||
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
|
||||
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
|
||||
if (code) {
|
||||
assert(0);
|
||||
}
|
||||
|
@ -669,6 +675,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
TEST(tableMeta, normalTable) {
|
||||
struct SCatalog *pCtg = NULL;
|
||||
void *mockPointer = (void *)0x1;
|
||||
|
@ -741,7 +748,7 @@ TEST(tableMeta, normalTable) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -837,7 +844,7 @@ TEST(tableMeta, childTableCase) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -938,7 +945,8 @@ TEST(tableMeta, superTableCase) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -1062,9 +1070,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SVgroupInfo *pvgInfo = NULL;
|
||||
SDBVgroupInfo dbVgroup = {0};
|
||||
SDBVgroupInfo *dbVgroup = NULL;
|
||||
SArray *vgList = NULL;
|
||||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndNormalMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
@ -1099,7 +1109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
taosArrayDestroy(vgList);
|
||||
|
||||
ctgTestBuildDBVgroup(&dbVgroup);
|
||||
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
|
||||
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
|
@ -1169,6 +1179,7 @@ TEST(multiThread, getSetDbVgroupCase) {
|
|||
catalogDestroy();
|
||||
}
|
||||
|
||||
|
||||
TEST(multiThread, ctableMeta) {
|
||||
struct SCatalog *pCtg = NULL;
|
||||
void *mockPointer = (void *)0x1;
|
||||
|
@ -1178,6 +1189,8 @@ TEST(multiThread, ctableMeta) {
|
|||
SArray *vgList = NULL;
|
||||
ctgTestStop = false;
|
||||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndChildMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
@ -1212,11 +1225,13 @@ TEST(multiThread, ctableMeta) {
|
|||
}
|
||||
|
||||
ctgTestStop = true;
|
||||
sleep(1);
|
||||
sleep(2);
|
||||
|
||||
catalogDestroy();
|
||||
}
|
||||
|
||||
|
||||
|
||||
TEST(rentTest, allRent) {
|
||||
struct SCatalog *pCtg = NULL;
|
||||
void *mockPointer = (void *)0x1;
|
||||
|
@ -1229,6 +1244,8 @@ TEST(rentTest, allRent) {
|
|||
SSTableMetaVersion *stable = NULL;
|
||||
uint32_t num = 0;
|
||||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
@ -1273,7 +1290,7 @@ TEST(rentTest, allRent) {
|
|||
printf("%d - expired stableNum:%d\n", i, num);
|
||||
if (stable) {
|
||||
for (int32_t n = 0; n < num; ++n) {
|
||||
printf("suid:%" PRId64 ", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
|
||||
printf("suid:%" PRId64 ", dbFName:%s, stbName:%s, sversion:%d, tversion:%d\n", stable[n].suid, stable[n].dbFName, stable[n].stbName, stable[n].sversion, stable[n].tversion);
|
||||
}
|
||||
free(stable);
|
||||
stable = NULL;
|
||||
|
@ -1291,4 +1308,4 @@ int main(int argc, char **argv) {
|
|||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -150,7 +150,7 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const
|
|||
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
CHECK_OUT_OF_MEM(realTable);
|
||||
if (NULL != pDbName) {
|
||||
strncpy(realTable->dbName, pDbName->z, pDbName->n);
|
||||
strncpy(realTable->table.dbName, pDbName->z, pDbName->n);
|
||||
}
|
||||
strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
|
||||
return (SNode*)realTable;
|
||||
|
@ -288,9 +288,6 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
|
|||
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
CHECK_OUT_OF_MEM(select);
|
||||
select->isDistinct = isDistinct;
|
||||
if (NULL == pProjectionList) {
|
||||
select->isStar = true;
|
||||
}
|
||||
select->pProjectionList = pProjectionList;
|
||||
select->pFromTable = pTable;
|
||||
return (SNode*)select;
|
||||
|
|
|
@ -1,18 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// int32_t doTranslate() {
|
||||
|
||||
// }
|
|
@ -15,8 +15,9 @@
|
|||
|
||||
#include "parserImpl.h"
|
||||
|
||||
#include "ttoken.h"
|
||||
#include "astCreateContext.h"
|
||||
#include "parserInt.h"
|
||||
#include "ttoken.h"
|
||||
|
||||
typedef void* (*FMalloc)(size_t);
|
||||
typedef void (*FFree)(void*);
|
||||
|
@ -25,7 +26,7 @@ extern void* NewParseAlloc(FMalloc);
|
|||
extern void NewParse(void*, int, SToken, void*);
|
||||
extern void NewParseFree(void*, FFree);
|
||||
|
||||
uint32_t toNewTokenId(uint32_t tokenId) {
|
||||
static uint32_t toNewTokenId(uint32_t tokenId) {
|
||||
switch (tokenId) {
|
||||
case TK_UNION:
|
||||
return NEW_TK_UNION;
|
||||
|
@ -73,7 +74,7 @@ uint32_t toNewTokenId(uint32_t tokenId) {
|
|||
return tokenId;
|
||||
}
|
||||
|
||||
uint32_t getToken(const char* z, uint32_t* tokenId) {
|
||||
static uint32_t getToken(const char* z, uint32_t* tokenId) {
|
||||
uint32_t n = tGetToken(z, tokenId);
|
||||
*tokenId = toNewTokenId(*tokenId);
|
||||
return n;
|
||||
|
@ -137,3 +138,289 @@ abort_parse:
|
|||
pQuery->pRoot = cxt.pRootNode;
|
||||
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// typedef struct SNamespace {
|
||||
// int16_t level; // todo for correlated subquery
|
||||
// char dbName[TSDB_DB_NAME_LEN];
|
||||
// char tableAlias[TSDB_TABLE_NAME_LEN];
|
||||
// SHashObj* pColHash; // key is colname, value is index of STableMeta.schema
|
||||
// STableMeta* pMeta;
|
||||
// } SNamespace;
|
||||
|
||||
typedef struct STranslateContext {
|
||||
SParseContext* pParseCxt;
|
||||
int32_t errCode;
|
||||
SMsgBuf msgBuf;
|
||||
SArray* pNsLevel; // element is SArray*, the element of this subarray is STableNode*
|
||||
int32_t currLevel;
|
||||
} STranslateContext;
|
||||
|
||||
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
|
||||
|
||||
static char* getSyntaxErrFormat(int32_t errCode) {
|
||||
switch (errCode) {
|
||||
case TSDB_CODE_PARSER_INVALID_COLUMN:
|
||||
return "Invalid column name : %s";
|
||||
case TSDB_CODE_PARSER_TABLE_NOT_EXIST:
|
||||
return "Table does not exist : %s";
|
||||
case TSDB_CODE_PARSER_AMBIGUOUS_COLUMN:
|
||||
return "Column ambiguously defined : %s";
|
||||
default:
|
||||
return "Unknown error";
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t generateSyntaxErrMsg(STranslateContext* pCxt, int32_t errCode, const char* additionalInfo) {
|
||||
snprintf(pCxt->msgBuf.buf, pCxt->msgBuf.len, getSyntaxErrFormat(errCode), additionalInfo);
|
||||
pCxt->errCode = errCode;
|
||||
return errCode;
|
||||
}
|
||||
|
||||
static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
|
||||
SArray* pTables = NULL;
|
||||
if (taosArrayGetSize(pCxt->pNsLevel) > pCxt->currLevel) {
|
||||
pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
} else {
|
||||
pTables = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
}
|
||||
taosArrayPush(pTables, &pTable);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SName* toName(const SRealTableNode* pRealTable, SName* pName) {
|
||||
strncpy(pName->dbname, pRealTable->table.dbName, strlen(pRealTable->table.dbName));
|
||||
strncpy(pName->dbname, pRealTable->table.tableName, strlen(pRealTable->table.tableName));
|
||||
return pName;
|
||||
}
|
||||
|
||||
static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) {
|
||||
int cmp = 0;
|
||||
if ('\0' != pCol->dbName[0]) {
|
||||
cmp = strcmp(pCol->dbName, pTable->dbName);
|
||||
} else {
|
||||
cmp = strcmp(currentDb, pTable->dbName);
|
||||
}
|
||||
if (0 == cmp) {
|
||||
cmp = strcmp(pCol->tableAlias, pTable->tableAlias);
|
||||
}
|
||||
return (0 == cmp);
|
||||
}
|
||||
|
||||
static SNodeList* getProjectList(SNode* pNode) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pNode)) {
|
||||
return ((SSelectStmt*)pNode)->pProjectionList;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t createColumnNodeByTable(const STableNode* pTable, SNodeList* pList) {
|
||||
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
|
||||
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
|
||||
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pCol->colId = pMeta->schema[i].colId;
|
||||
pCol->colType = pMeta->schema[i].type;
|
||||
pCol->node.resType.bytes = pMeta->schema[i].bytes;
|
||||
nodesListAppend(pList, (SNode*)pCol);
|
||||
}
|
||||
} else {
|
||||
SNodeList* pProjectList = getProjectList(((STempTableNode*)pTable)->pSubquery);
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pProjectList) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pCol->pProjectRef = (SNode*)pExpr;
|
||||
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
|
||||
pCol->node.resType = pExpr->resType;
|
||||
nodesListAppend(pList, (SNode*)pCol);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
|
||||
bool found = false;
|
||||
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
|
||||
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
|
||||
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
|
||||
pCol->colId = pMeta->schema[i].colId;
|
||||
pCol->colType = pMeta->schema[i].type;
|
||||
pCol->node.resType.bytes = pMeta->schema[i].bytes;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SNodeList* pProjectList = getProjectList(((STempTableNode*)pTable)->pSubquery);
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pProjectList) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
if (0 == strcmp(pCol->colName, pExpr->aliasName)) {
|
||||
pCol->pProjectRef = (SNode*)pExpr;
|
||||
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
|
||||
pCol->node.resType = pExpr->resType;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
static bool doTranslateExpr(SNode* pNode, void* pContext) {
|
||||
STranslateContext* pCxt = (STranslateContext*)pContext;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_COLUMN: {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
size_t nums = taosArrayGetSize(pTables);
|
||||
bool hasTableAlias = ('\0' != pCol->tableAlias[0]);
|
||||
bool found = false;
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
if (hasTableAlias) {
|
||||
if (belongTable(pCxt->pParseCxt->db, pCol, pTable)) {
|
||||
if (findAndSetColumn(pCol, pTable)) {
|
||||
break;
|
||||
}
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_INVALID_COLUMN, pCol->colName);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (findAndSetColumn(pCol, pTable)) {
|
||||
if (found) {
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_AMBIGUOUS_COLUMN, pCol->colName);
|
||||
return false;
|
||||
}
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_VALUE:
|
||||
break; // todo check literal format
|
||||
case QUERY_NODE_OPERATOR: {
|
||||
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_FUNCTION:
|
||||
break; // todo
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
return translateSubquery(pCxt, ((STempTableNode*)pNode)->pSubquery);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t translateExpr(STranslateContext* pCxt, SNode* pNode) {
|
||||
nodesWalkNodePostOrder(pNode, doTranslateExpr, pCxt);
|
||||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
static int32_t translateExprList(STranslateContext* pCxt, SNodeList* pList) {
|
||||
nodesWalkListPostOrder(pList, doTranslateExpr, pCxt);
|
||||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pTable)) {
|
||||
case QUERY_NODE_REAL_TABLE: {
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
|
||||
if ('\0' == pRealTable->table.dbName[0]) {
|
||||
strcpy(pRealTable->table.dbName, pCxt->pParseCxt->db);
|
||||
}
|
||||
if ('\0' == pRealTable->table.tableAlias[0]) {
|
||||
strcpy(pRealTable->table.tableAlias, pRealTable->table.tableName);
|
||||
}
|
||||
SName name;
|
||||
code = catalogGetTableMeta(
|
||||
pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), toName(pRealTable, &name), &(pRealTable->pMeta));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_TABLE_NOT_EXIST, pRealTable->table.tableName);
|
||||
}
|
||||
code = addNamespace(pCxt, pRealTable);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_TEMP_TABLE: {
|
||||
STempTableNode* pTempTable = (STempTableNode*)pTable;
|
||||
code = translateSubquery(pCxt, pTempTable->pSubquery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNamespace(pCxt, pTempTable);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTable = (SJoinTableNode*)pTable;
|
||||
code = translateTable(pCxt, pJoinTable->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateTable(pCxt, pJoinTable->pRight);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, pJoinTable->pOnCond);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool* pIsSelectStar) {
|
||||
if (NULL == pSelect->pProjectionList) { // select * ...
|
||||
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
size_t nums = taosArrayGetSize(pTables);
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
createColumnNodeByTable(pTable, pSelect->pProjectionList);
|
||||
}
|
||||
*pIsSelectStar = true;
|
||||
} else {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
code = translateTable(pCxt, pSelect->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, pSelect->pWhere);
|
||||
}
|
||||
bool isSelectStar = false;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateStar(pCxt, pSelect, &isSelectStar);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !isSelectStar) {
|
||||
code = translateExprList(pCxt, pSelect->pProjectionList);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
code = translateSelect(pCxt, (SSelectStmt*)pNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
|
||||
++(pCxt->currLevel);
|
||||
int32_t code = translateQuery(pCxt, pNode);
|
||||
--(pCxt->currLevel);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||
STranslateContext cxt = { .pParseCxt = pParseCxt, .pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES), .currLevel = 0 };
|
||||
return translateQuery(&cxt, pQuery->pRoot);
|
||||
}
|
||||
|
|
|
@ -71,8 +71,8 @@ private:
|
|||
switch (nodeType(node)) {
|
||||
case QUERY_NODE_REAL_TABLE: {
|
||||
SRealTableNode* realTable = (SRealTableNode*)table;
|
||||
if ('\0' != realTable->dbName[0]) {
|
||||
sql.append(realTable->dbName);
|
||||
if ('\0' != realTable->table.dbName[0]) {
|
||||
sql.append(realTable->table.dbName);
|
||||
sql.append(".");
|
||||
}
|
||||
sql.append(realTable->table.tableName);
|
||||
|
|
|
@ -45,11 +45,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
|||
|
||||
bMsg->header.vgId = htonl(bInput->vgId);
|
||||
|
||||
if (bInput->dbName) {
|
||||
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
|
||||
if (bInput->dbFName) {
|
||||
tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
|
||||
}
|
||||
|
||||
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
|
||||
tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
|
||||
|
||||
*msgLen = (int32_t)sizeof(*bMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -113,12 +113,19 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
|
||||
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
|
||||
pOut->dbVgroup.dbId = pRsp->uid;
|
||||
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pOut->dbVgroup.vgInfo) {
|
||||
qError("hash init[%d] failed", pRsp->vgNum);
|
||||
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
|
||||
if (NULL == pOut->dbVgroup) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pOut->dbVgroup->vgVersion = pRsp->vgVersion;
|
||||
pOut->dbVgroup->hashMethod = pRsp->hashMethod;
|
||||
pOut->dbVgroup->dbId = pRsp->uid;
|
||||
pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pOut->dbVgroup->vgHash) {
|
||||
qError("taosHashInit %d failed", pRsp->vgNum);
|
||||
tfree(pOut->dbVgroup);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -131,8 +138,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
qError("hash push failed");
|
||||
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
|
||||
qError("taosHashPut failed");
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
@ -142,8 +149,10 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return code;
|
||||
|
||||
_return:
|
||||
|
||||
if (pOut) {
|
||||
tfree(pOut->dbVgroup.vgInfo);
|
||||
taosHashCleanup(pOut->dbVgroup->vgHash);
|
||||
tfree(pOut->dbVgroup);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -248,16 +257,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
strcpy(pOut->dbFName, pMetaMsg->dbFName);
|
||||
|
||||
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
|
||||
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
|
||||
|
||||
if (pMetaMsg->dbFname[0]) {
|
||||
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
|
||||
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
|
||||
} else {
|
||||
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
|
||||
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
|
||||
}
|
||||
strcpy(pOut->ctbName, pMetaMsg->tbName);
|
||||
strcpy(pOut->tbName, pMetaMsg->stbName);
|
||||
|
||||
pOut->ctbMeta.vgId = pMetaMsg->vgId;
|
||||
pOut->ctbMeta.tableType = pMetaMsg->tableType;
|
||||
|
@ -268,11 +274,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
} else {
|
||||
SET_META_TYPE_TABLE(pOut->metaType);
|
||||
|
||||
if (pMetaMsg->dbFname[0]) {
|
||||
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
|
||||
} else {
|
||||
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
|
||||
}
|
||||
strcpy(pOut->tbName, pMetaMsg->tbName);
|
||||
|
||||
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
}
|
||||
|
@ -291,4 +293,4 @@ void initQueryModuleMsgHandle() {
|
|||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -132,7 +132,7 @@ typedef struct SSchJob {
|
|||
SQueryProfileSummary summary;
|
||||
} SSchJob;
|
||||
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
|
||||
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
|
||||
|
||||
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
|
||||
|
|
|
@ -773,14 +773,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||
pErrTask = par;
|
||||
|
||||
atomic_add_fetch_32(&par->childReady, 1);
|
||||
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &par->lock);
|
||||
SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
|
||||
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &par->lock);
|
||||
|
||||
if (SCH_TASK_READY_TO_LUNCH(par)) {
|
||||
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
|
||||
SCH_ERR_RET(schLaunchTask(pJob, par));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
}
|
||||
|
||||
if (pRead->pHead->head.version != ver) {
|
||||
/*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/
|
||||
wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
|
@ -178,7 +178,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
|
||||
code = walValidBodyCksum(pRead->pHead);
|
||||
if (code != 0) {
|
||||
/*wError("unexpected wal log version: checksum not passed");*/
|
||||
wError("unexpected wal log version: checksum not passed");
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
|
|
|
@ -15,68 +15,101 @@
|
|||
|
||||
#include "nodes.h"
|
||||
|
||||
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
|
||||
typedef enum ETraversalOrder {
|
||||
TRAVERSAL_PREORDER = 1,
|
||||
TRAVERSAL_POSTORDER
|
||||
} ETraversalOrder;
|
||||
|
||||
bool nodesWalkNodeList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext) {
|
||||
static bool walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext);
|
||||
|
||||
static bool walkNode(SNode* pNode, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) {
|
||||
if (NULL == pNode) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (TRAVERSAL_PREORDER == order && !walker(pNode, pContext)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool res = true;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_COLUMN:
|
||||
case QUERY_NODE_VALUE:
|
||||
case QUERY_NODE_LIMIT:
|
||||
// these node types with no subnodes
|
||||
break;
|
||||
case QUERY_NODE_OPERATOR: {
|
||||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
res = walkNode(pOpNode->pLeft, order, walker, pContext);
|
||||
if (res) {
|
||||
res = walkNode(pOpNode->pRight, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_CONDITION:
|
||||
res = walkList(((SLogicConditionNode*)pNode)->pParameterList, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_IS_NULL_CONDITION:
|
||||
res = walkNode(((SIsNullCondNode*)pNode)->pExpr, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_FUNCTION:
|
||||
res = walkList(((SFunctionNode*)pNode)->pParameterList, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
break; // todo
|
||||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
|
||||
res = walkNode(pJoinTableNode->pLeft, order, walker, pContext);
|
||||
if (res) {
|
||||
res = walkNode(pJoinTableNode->pRight, order, walker, pContext);
|
||||
}
|
||||
if (res) {
|
||||
res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_GROUPING_SET:
|
||||
res = walkList(((SGroupingSetNode*)pNode)->pParameterList, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_ORDER_BY_EXPR:
|
||||
res = walkNode(((SOrderByExprNode*)pNode)->pExpr, order, walker, pContext);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (res && TRAVERSAL_POSTORDER == order) {
|
||||
res = walker(pNode, pContext);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) {
|
||||
SNode* node;
|
||||
FOREACH(node, pNodeList) {
|
||||
if (!nodesWalkNode(node, walker, pContext)) {
|
||||
if (!walkNode(node, order, walker, pContext)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||
if (NULL == pNode) {
|
||||
return true;
|
||||
}
|
||||
void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||
(void)walkNode(pNode, TRAVERSAL_PREORDER, walker, pContext);
|
||||
}
|
||||
|
||||
if (!walker(pNode, pContext)) {
|
||||
return false;
|
||||
}
|
||||
void nodesWalkList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext) {
|
||||
(void)walkList(pNodeList, TRAVERSAL_PREORDER, walker, pContext);
|
||||
}
|
||||
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_COLUMN:
|
||||
case QUERY_NODE_VALUE:
|
||||
case QUERY_NODE_LIMIT:
|
||||
// these node types with no subnodes
|
||||
return true;
|
||||
case QUERY_NODE_OPERATOR: {
|
||||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
if (!nodesWalkNode(pOpNode->pLeft, walker, pContext)) {
|
||||
return false;
|
||||
}
|
||||
return nodesWalkNode(pOpNode->pRight, walker, pContext);
|
||||
}
|
||||
case QUERY_NODE_LOGIC_CONDITION:
|
||||
return nodesWalkNodeList(((SLogicConditionNode*)pNode)->pParameterList, walker, pContext);
|
||||
case QUERY_NODE_IS_NULL_CONDITION:
|
||||
return nodesWalkNode(((SIsNullCondNode*)pNode)->pExpr, walker, pContext);
|
||||
case QUERY_NODE_FUNCTION:
|
||||
return nodesWalkNodeList(((SFunctionNode*)pNode)->pParameterList, walker, pContext);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
return true; // todo
|
||||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
|
||||
if (!nodesWalkNode(pJoinTableNode->pLeft, walker, pContext)) {
|
||||
return false;
|
||||
}
|
||||
if (!nodesWalkNode(pJoinTableNode->pRight, walker, pContext)) {
|
||||
return false;
|
||||
}
|
||||
return nodesWalkNode(pJoinTableNode->pOnCond, walker, pContext);
|
||||
}
|
||||
case QUERY_NODE_GROUPING_SET:
|
||||
return nodesWalkNodeList(((SGroupingSetNode*)pNode)->pParameterList, walker, pContext);
|
||||
case QUERY_NODE_ORDER_BY_EXPR:
|
||||
return nodesWalkNode(((SOrderByExprNode*)pNode)->pExpr, walker, pContext);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||
(void)walkNode(pNode, TRAVERSAL_POSTORDER, walker, pContext);
|
||||
}
|
||||
|
||||
return false;
|
||||
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext) {
|
||||
(void)walkList(pList, TRAVERSAL_PREORDER, walker, pContext);
|
||||
}
|
||||
|
||||
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
#include "osString.h"
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ target_link_libraries(
|
|||
util
|
||||
PRIVATE os
|
||||
PUBLIC lz4_static
|
||||
PUBLIC api
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tbuffer.h"
|
||||
#include "exception.h"
|
||||
#include "os.h"
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include "tnote.h"
|
||||
#include "tutil.h"
|
||||
#include "ulog.h"
|
||||
//#include "zlib.h"
|
||||
|
||||
#define MAX_LOGLINE_SIZE (1000)
|
||||
#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10)
|
||||
|
|
|
@ -1,3 +1,20 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tpagedfile.h"
|
||||
#include "thash.h"
|
||||
#include "stddef.h"
|
||||
|
|
Loading…
Reference in New Issue